Create a queue with a Controller in Drupal8

Queues are particularly important when we need to stash some tasks for later processing. To do so, we are going to put some tasks or data in a queue (create the queue) and later we will process those tasks with a QueueWorker plugin (process the queue), usually triggered by cron.

There are several ways to create a queue:
- With a form class
- With a controller class
- With a hook_cron() function
 
To process the queue, we also have different options:
- As a cron process with a QueueWorker plugin
- As a batch process also with QueueWorker plugin but extending a base plugin
- As a batch process claiming each item of the queue in a service or in a controller

Today we'll create the queue with a controller and process it with a QueueWorker plugin when cron runs or manually with Drupal Console.

Creating the queue with a controller has the advantage of allowing us to use an external crontab (e.g., a Linux crontab), so we won't need Drupal’s cron at all to run the controller, allowing us to launch it when we need it. This is also a more reliable method because it uses fewer resources and it is independent of any page request. Remember that the Drupal cron is 'poor man’s cron', as it depends on page requests for its execution even if there is a way to execute it with a linux crontab.

In this example module we'll generate a queue with a controller, importing the title and the description tags form the Drupal Planet RSS file. Next, when Cron runs, , we'll create a node page with a QueueWorker plugin for each item in the queue. You can download the code here: https://github.com/KarimBoudjema/Drupal8-ex-queue-api-01.

This module has two main parts:

1. A controller class src/Controller/ExQueueController.php with its corresponding route in exqueue01.routing.yml and two main methods:

getData() - Get external data and insert the data in the queue 'exqueue_import'.
deleteTheQueue() - Delete all item in the queue.

2. A Cron QueueWorker plugin src/Plugin/QueueWorker/ExQueue01.php that will process each item in the queue.

You can see the tree module here:

web/modules/custom/exqueue01/
|-- exqueue01.info.yml
|-- exqueue01.module
|-- exqueue01.permissions.yml
|-- exqueue01.routing.yml
|-- README.txt
`-- src
    |-- Controller
    |   `-- ExQueueController.php
    `-- Plugin
        `-- QueueWorker
            `-- ExQueue01.php

4 directories, 7 files

That's not too difficult, is it?

1. Create the queue with a controller.

Here is the code of the controller. You can also get it here: https://github.com/KarimBoudjema/Drupal8-ex-queue-api-01/blob/master/src/Controller/ExQueueController.php

<?php
namespace Drupal\exqueue01\Controller;
use Drupal\Core\Controller\ControllerBase;
use Drupal\Core\Queue\QueueFactory;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Messenger\MessengerInterface;
use GuzzleHttp\ClientInterface;
use GuzzleHttp\Exception\RequestException;
/**
 * Class ExQueueController.
 *
 * Demonstrates the use of the Queue API
 * There is two routes.
 * 1) \Drupal\exqueue01\Controller\ExQueueController::getData
 * The getData() methods allows to load external data and
 * for each array element create a queue element
 * Then on Cron run, we create a page node for each element with
 * 2) \Drupal\exqueue01\Controller\ExQueueController::deleteTheQueue
 * The deleteTheQueue() methods delete the queue "exqueue_import"
 * and all its elements
 * Once the queue is created with tha data, on Cron run
 * we create a new page node for each item in the queue with the QueueWorker
 * plugin ExQueue01.php .
 */
class ExQueueController extends ControllerBase {
  /**
   * Drupal\Core\Messenger\MessengerInterface definition.
   *
   * @var \Drupal\Core\Messenger\MessengerInterface
   */
  protected $messenger;
  /**
   * Symfony\Component\DependencyInjection\ContainerAwareInterface definition.
   *
   * @var \Symfony\Component\DependencyInjection\ContainerAwareInterface
   */
  protected $queueFactory;
  /**
   * GuzzleHttp\ClientInterface definition.
   *
   * @var \GuzzleHttp\ClientInterface
   */
  protected $client;
  /**
   * Inject services.
   */
  public function __construct(MessengerInterface $messenger, QueueFactory $queue, ClientInterface $client) {
    $this->messenger = $messenger;
    $this->queueFactory = $queue;
    $this->client = $client;
  }
  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container) {
    return new static(
      $container->get('messenger'),
      $container->get('queue'),
      $container->get('http_client')
    );
  }
  /**
   * Delete the queue 'exqueue_import'.
   *
   * Remember that the command drupal dq checks first for a queue worker
   * and if it exists, DC suposes that a queue exists.
   */
  public function deleteTheQueue() {
    $this->queueFactory->get('exqueue_import')->deleteQueue();
    return [
      '#type' => 'markup',
      '#markup' => $this->t('The queue "exqueue_import" has been deleted'),
    ];
  }
  /**
   * Getdata from external source and create a item queue for each data.
   *
   * @return array
   *   Return string.
   */
  public function getData() {
    // 1. Get data into an array of objects
    // 2. Get the queue and the total of items before the operations
    // 3. For each element of the array, create a new queue item
    // 1. Get data into an array of objects
    // We can choose between two methods
    // getDataFromRSS() or getFakeData()
    $data = $this->getDataFromRss();
    // $data = $this->getFakeData();
    if (!$data) {
      return [
        '#type' => 'markup',
        '#markup' => $this->t('No data found'),
      ];
    }
    // 2. Get the queue and the total of items before the operations
    // Get the queue implementation for 'exqueue_import' queue.
    $queue = $this->queueFactory->get('exqueue_import');
    // Get the total of items in the queue before adding new items.
    $totalItemsBefore = $queue->numberOfItems();
    // 3. For each element of the array, create a new queue item.
    foreach ($data as $element) {
      // Create new queue item.
      $queue->createItem($element);
    }
    // 4. Get the total of item in the Queue.
    $totalItemsAfter = $queue->numberOfItems();
    // 5. Get what's in the queue now.
    $tableVariables = $this->getItemList($queue);
    $finalMessage = $this->t('The Queue had @totalBefore items. We should have added @count items in the Queue. Now the Queue has @totalAfter items.',
      [
        '@count' => count($data),
        '@totalAfter' => $totalItemsAfter,
        '@totalBefore' => $totalItemsBefore,
      ]);
    return [
      '#type' => 'table',
      '#caption' => $finalMessage,
      '#header' => $tableVariables['header'],
      '#rows' => $tableVariables['rows'],
      '#attributes' => $tableVariables['attributes'],
      '#sticky' => $tableVariables['sticky'],
      'empty' => $this->t('No items.'),
    ];
  }
  /**
   * Generate an array of objects to simulate getting data from an RSS file.
   *
   * @return array
   *   Return an array of data
   */
  protected function getFakeData() {
    // We should get the XML content and convert it to an array of item objects
    // We use now an example array of item object.
    $content = [];
    for ($i = 1; $i <= 10; $i++) {
      $item = new \stdClass();
      $item->title = 'Title ' . $i;
      $item->body = 'Body ' . $i;
      $content[] = $item;
    }
    return $content;
  }
  /**
   * Generate an array of objects from an external RSS file.
   *
   * @return array|bool
   *   Return an array or false
   */
  protected function getDataFromRss() {
    // 1. Try to get the data form the RSS
    // URI of the XML file.
    $uri = 'https://www.drupal.org/planet/rss.xml';
    // 1. Try to get the data form the RSS.
    try {
      $response = $this->client->get($uri, ['headers' => ['Accept' => 'text/plain']]);
      $data = (string) $response->getBody();
      if (empty($data)) {
        return FALSE;
      }
    }
    catch (RequestException $e) {
      return FALSE;
    }
    // 2. Retrieve data in a simple xml object.
    $data = simplexml_load_string($data);
    // 3. Transform in a array of object
    // We could transform in array
    // $data = json_decode(json_encode($data));
    // Look at all children of the channel child.
    $content = [];
    foreach ($data->children()->children() as $child) {
      if (!empty($child->title)) {
        // Create an object.
        $item = new \stdClass();
        $item->title = $child->title->__toString();
        $item->body = $child->description->__toString();
        // Place the object in an array.
        $content[] = $item;
      }
    }
    if (empty($content)) {
      return FALSE;
    }
    return $content;
    /*
    // Using simplexml_load_file
    $xml = simplexml_load_file($uri);
    $data = json_decode(json_encode($xml));
    ksm($data);
     */
  }
  /**
   * Get all items of queue.
   *
   * Next place them in an array so we can retrieve them in a table.
   *
   * @param object $queue
   *   A queue object.
   *
   * @return array
   *   A table array for rendering.
   */
  protected function getItemList($queue) {
    $retrieved_items = [];
    $items = [];
    // Claim each item in queue.
    while ($item = $queue->claimItem()) {
      $retrieved_items[] = [
        'data' => [$item->data->title, $item->item_id],
      ];
      // Track item to release the lock.
      $items[] = $item;
    }
    // Release claims on items in queue.
    foreach ($items as $item) {
      $queue->releaseItem($item);
    }
    // Put the items in a table array for rendering.
    $tableTheme = [
      'header' => [$this->t('Title'), $this->t('ID')],
      'rows'   => $retrieved_items,
      'attributes' => [],
      'caption' => '',
      'colgroups' => [],
      'sticky' => TRUE,
      'empty' => $this->t('No items.'),
    ];
    return $tableTheme;
  }
}

First we injected the QueueFactory service into our controller. We also injected two others services like messenger to display Drupal messages to the user and the http_client service to retrieve the Drupal Planet RSS file.

Next in the getData() method we'll do three processes:

a- Get the data from an external RSS file
b- Get or create the queue and populate it
c- Retrieve all the items in the queue for information

a -  Get the data form the Drupal Planet RSS file in an array of objects with the custom method getDataFromRss()

$data = $this->getDataFromRss();

Note that there is also another custom method getFakeData() that you can use in local. It simulates the results of getting data from a external RSS file.

b – Get or in fact create the queue 'exqueue_import', get the total of item in the at this time for information purpose and populate the queue with the $data array.

    	$queue = $this->queueFactory->get('exqueue_import');
	$totalItemsBefore = $queue->numberOfItems();
    	foreach ($data as $element) {
      	  // Create new queue item.
      	  $queue->createItem($element);
    	}

Here comes the most interesting parts.
$queue = $this->queueFactory->get('exqueue_import');
This creates an instance of the default QueueInterface and the name of the queue we want to use or to create.

$totalItemsBefore = $queue->numberOfItems();
This give us the total of items in the queue, currenty for information purposes only.

$queue->createItem($element);
This simply adds an item to the queue. As the $data array is made of objects (see the getDataFromRss() method), we are wrapping the data as a good practice.

c- Retrieve all the items in the queue for information with the getItemList() custom method.

In this method we first claim each item of the queue with the claimItem() method. This method also locks or lease the item for a period of time of one hour by default. So we'll need to release those items later.
We save each item in the  $retrieved_items array that will use to show the information later to the user.

    // Claim each item in queue.    
    while ($item = $queue->claimItem()) {
      $retrieved_items[] = [
        'data' => [$item->data->title, $item->item_id],
      ];
      // Track item to release the lock.
      $items[] = $item;
    }


We also save the item information to the $items array to release each item in the next loop.

    // Release claims on items in queue.
    foreach ($items as $item) {
      $queue->releaseItem($item);
    }

That's it. You can see now how easy it is to create a queue and populating it.

You can now visit the page (/exqueue01/getData) and see how the queue is populated.

To debug the queue, you can use the Drupal Console command drupal debug:queue , but it will work only if you have a QueueWorker plugin associated with this queue.

You can also see the table queue in the database and see if it has been populated with the data of your queue.  

In a next post we'll see how to create a queue with a form and with a hook_cron

2. Create a Cron QueueWorker plugin to process the queue

As we've seen, create a queue is quite simple. Now we have to code a QueueWorker to process each element in the queue.

This QueueWorker is responsible for processing each element of the queue when cron runs. In our case, the process consists in creating a page node with the data of each item.

<?php
namespace Drupal\exqueue01\Plugin\QueueWorker;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
/**
 * Save queue item in a node.
 *
 * To process the queue items whenever Cron is run,
 * we need a QueueWorker plugin with an annotation witch defines
 * to witch queue it applied.
 *
 * @QueueWorker(
 *   id = "exqueue_import",
 *   title = @Translation("Import Content From RSS"),
 *   cron = {"time" = 5}
 * )
 */
class ExQueue01 extends QueueWorkerBase implements ContainerFactoryPluginInterface {
  /**
   * Drupal\Core\Entity\EntityTypeManagerInterface definition.
   *
   * @var \Drupal\Core\Entity\EntityTypeManagerInterface
   */
  private $entityTypeManager;
  /**
   * Drupal\Core\Logger\LoggerChannelFactoryInterface definition.
   *
   * @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
   */
  private $loggerChannelFactory;
  /**
   * {@inheritdoc}
   */
  public function __construct(array $configuration,
                              $plugin_id,
                              $plugin_definition,
                              EntityTypeManagerInterface $entityTypeManager,
                              LoggerChannelFactoryInterface $loggerChannelFactory) {
    parent::__construct($configuration, $plugin_id, $plugin_definition);
    $this->entityTypeManager = $entityTypeManager;
    $this->loggerChannelFactory = $loggerChannelFactory;
  }
  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $configuration,
      $plugin_id,
      $plugin_definition,
      $container->get('entity_type.manager'),
      $container->get('logger.factory')
    );
  }
  /**
   * {@inheritdoc}
   */
  public function processItem($item) {
    // Save the queue item in a node
    // Check the values of $item.
    $title = isset($item->title) && $item->title ? $item->title : NULL;
    $body = isset($item->body) && $item->body ? $item->body : NULL;
    try {
      // Check if we have a title and a body.
      if (!$title || !$body) {
        throw new \Exception('Missing Title or Body');
      }
      $storage = $this->entityTypeManager->getStorage('node');
      $node = $storage->create(
        [
          'type' => 'page',
          'title' => $item->title,
          'body' => [
            'value' => $item->body,
            'format' => 'basic_html',
          ],
        ]
      );
      $node->save();
      // Log in the watchdog for debugging purpose.
      $this->loggerChannelFactory->get('debug')
        ->debug('Create node @id from queue %item',
          [
            '@id' => $node->id(),
            '%item' => print_r($item, TRUE),
          ]);
    }
    catch (\Exception $e) {
      $this->loggerChannelFactory->get('Warning')
        ->warning('Exception trow for queue @error',
          ['@error' => $e->getMessage()]);
    }
  }
}

First let's have a look at the annotation of this plugin

 * @QueueWorker(
 *   id = "exqueue_import",
 *   title = @Translation("Import Content From RSS"),
 *   cron = {"time" = 5}
 * )

id = "exqueue_import"
Indicates the ID of the queue we will process
cron = {"time" = 5}
Indicates that the plugin should be used by the cron system. The time key indicates that we allocate 5 seconds to process all the items in this queue. If we couldn't process some items in the allocated time, they will be processed at the next cron run.

But all the action happens in the processItem() method from the QueueWorkerInterface. It's in this method that we process each item. It's quite a straightforward process. Nothing new here.

Note about Exception

Note that on cron run, the processQueues() method of the cron object in cron.php will be fired. You can find the code for this method below.

/**
 * Processes cron queues.
 */
protected function processQueues() {
  // Grab the defined cron queues.
  foreach ($this->queueManager->getDefinitions() as $queue_name => $info) {
    if (isset($info['cron'])) {
      // Make sure every queue exists. There is no harm in trying to recreate
      // an existing queue.
      $this->queueFactory->get($queue_name)->createQueue();
      $queue_worker = $this->queueManager->createInstance($queue_name);
      $end = time() + (isset($info['cron']['time']) ? $info['cron']['time'] : 15);
      $queue = $this->queueFactory->get($queue_name);
      $lease_time = isset($info['cron']['time']) ?: NULL;
      while (time() < $end && ($item = $queue->claimItem($lease_time))) {
        try {
          $queue_worker->processItem($item->data);
          $queue->deleteItem($item);
        }
        catch (RequeueException $e) {
          // The worker requested the task be immediately requeued.
          $queue->releaseItem($item);
        }
        catch (SuspendQueueException $e) {
          // If the worker indicates there is a problem with the whole queue,
          // release the item and skip to the next queue.
          $queue->releaseItem($item);
          watchdog_exception('cron', $e);
          // Skip to the next queue.
          continue 2;
        }
        catch (\Exception $e) {
          // In case of any other kind of exception, log it and leave the item
          // in the queue to be processed again later.
          watchdog_exception('cron', $e);
        }
      }
    }
  }
}

You can see in the processQueues() method that the processItem() method is in a ‘try catch’ block, so if we throw an Exception in our QueueWorker it will be catched by the processQueues() method, the item won't be deleted and the Exception will be logged. So the item would stay in the queue if we throw an exception in the processItem() method. That's a good design, as we can examine the log and see what's happened and do something with this item later.

You can also see that you could throw a RequeueException or a SuspendQueueException. In the first case the item is requeued and in the second the process is suspended for the whole queue.

But if we want, like in our case, to delete the item from our queue if there is an error, we need to catch the Exception in a ‘try catch’ block in our QueueWorker itself. In this case, we won't return an Exception to the processQueues() method and then our item will be deleted. That's a choice. This means that if we don't have a title or a description, the data is useless for us and we can delete it from the queue. This also shows that we accepted useless data to be queued in an earlier process.  

3. Process the queue

Now we can process the queue with:

cron – In my testing, I was unable to process a queue with drush cron or drupal cron:excecute because they only invoke modules with cron handlers implementing hook_cron. You'll have to launch cron manually in the drupal interface.

Drupal Console – You can use the Drupal Console drupal queue:run <name-of-the-queue> command. In our case it would be drupal queue:run  exqueue_import

 

Recap

We created a controller ExQueueController.php with a getData() method.
We get the data from the Drupal Planet RSS file with the getDataFromRss() method into an array of objects.
We get or created an instance of the default QueueInterface with $this->queueFactory->get('exqueue_import').
We created each item in the queue with the $queue->createItem() method
Once the queue is created, we set up a Cron QueueWorker plugin where we processed each item in the queue with the processItem() method.