Sous Drupal, les queues sont particulièrement importantes lorsque nous avons besoin de postposer certaines tâches pour un traitement futur. Pour ce faire, nous allons placer ces tâches ou données dans une queue (créer la queue) et ensuite nous allons traiter ces données avec un 'QueueWorker plugin' (traitement de la queue), généralement grâce à un process de type cron.
Il existe plusieurs manières de créer une queue:
- Avec un formulaire
- Avec un contrôleur
- Avec une fonction hook_cron()
Pour traiter la queue, nous avons aussi différentes options:
- Par cron avec un 'QueueWorker plugin'
- Par un batch process aussi avec un 'QueueWorker plugin' mais en étendant un plugin de base
- Par un batch process qui traitera chaque élément de la queue dans un service ou dans un contrôleur.
Dans ce module d'exemple, nous allons grâce à un contrôleur, importer le titre et la description des post que se trouvent dans le fichier RSS de Drupal Planet et les placer dans une queue. Ensuite, sur base de ces données nous allons créer des nodes de type page grâce à un 'QueueWorker plugin' qui sera lancé lorsque le cron s'exécutera. Vous pouvez télécharger le code ici: https://github.com/KarimBoudjema/Drupal8-ex-queue-api-01.
Ce module se divise en deux parties:
1. Un contrôleur src/Controller/ExQueueController.php
avec une route dans exqueue01.routing.yml
et deux méthodes principales:
getData() - Importe les données externes et les place dans une queue appelée 'exqueue_import'.
deleteTheQueue() - Élimine tous les éléments qui se trouvent dans une queue.
2. Un 'QueueWorker plugin' src/Plugin/QueueWorker/ExQueue01.php
qui traitera chaque élément de la queue quand cron s'exécutera.
Vous pouvez voir l'arbre du module à continuation.
web/modules/custom/exqueue01/
|-- exqueue01.info.yml
|-- exqueue01.module
|-- exqueue01.permissions.yml
|-- exqueue01.routing.yml
|-- README.txt
`-- src
|-- Controller
| `-- ExQueueController.php
`-- Plugin
`-- QueueWorker
`-- ExQueue01.php
4 directories, 7 files
Ça n'a pas l'air très compliqué non?
1. Créer la queue avec un contrôleur.
Voici le code du contrôleur. Vous pouvez aussi le télécharger ici: https://github.com/KarimBoudjema/Drupal8-ex-queue-api-01/blob/master/src/Controller/ExQueueController.php
<?php
namespace Drupal\exqueue01\Controller;
use Drupal\Core\Controller\ControllerBase;
use Drupal\Core\Queue\QueueFactory;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Messenger\MessengerInterface;
use GuzzleHttp\ClientInterface;
use GuzzleHttp\Exception\RequestException;
/**
* Class ExQueueController.
*
* Demonstrates the use of the Queue API
* There is two routes.
* 1) \Drupal\exqueue01\Controller\ExQueueController::getData
* The getData() methods allows to load external data and
* for each array element create a queue element
* Then on Cron run, we create a page node for each element with
* 2) \Drupal\exqueue01\Controller\ExQueueController::deleteTheQueue
* The deleteTheQueue() methods delete the queue "exqueue_import"
* and all its elements
* Once the queue is created with tha data, on Cron run
* we create a new page node for each item in the queue with the QueueWorker
* plugin ExQueue01.php .
*/
class ExQueueController extends ControllerBase {
/**
* Drupal\Core\Messenger\MessengerInterface definition.
*
* @var \Drupal\Core\Messenger\MessengerInterface
*/
protected $messenger;
/**
* Symfony\Component\DependencyInjection\ContainerAwareInterface definition.
*
* @var \Symfony\Component\DependencyInjection\ContainerAwareInterface
*/
protected $queueFactory;
/**
* GuzzleHttp\ClientInterface definition.
*
* @var \GuzzleHttp\ClientInterface
*/
protected $client;
/**
* Inject services.
*/
public function __construct(MessengerInterface $messenger, QueueFactory $queue, ClientInterface $client) {
$this->messenger = $messenger;
$this->queueFactory = $queue;
$this->client = $client;
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container) {
return new static(
$container->get('messenger'),
$container->get('queue'),
$container->get('http_client')
);
}
/**
* Delete the queue 'exqueue_import'.
*
* Remember that the command drupal dq checks first for a queue worker
* and if it exists, DC suposes that a queue exists.
*/
public function deleteTheQueue() {
$this->queueFactory->get('exqueue_import')->deleteQueue();
return [
'#type' => 'markup',
'#markup' => $this->t('The queue "exqueue_import" has been deleted'),
];
}
/**
* Getdata from external source and create a item queue for each data.
*
* @return array
* Return string.
*/
public function getData() {
// 1. Get data into an array of objects
// 2. Get the queue and the total of items before the operations
// 3. For each element of the array, create a new queue item
// 1. Get data into an array of objects
// We can choose between two methods
// getDataFromRSS() or getFakeData()
$data = $this->getDataFromRss();
// $data = $this->getFakeData();
if (!$data) {
return [
'#type' => 'markup',
'#markup' => $this->t('No data found'),
];
}
// 2. Get the queue and the total of items before the operations
// Get the queue implementation for 'exqueue_import' queue.
$queue = $this->queueFactory->get('exqueue_import');
// Get the total of items in the queue before adding new items.
$totalItemsBefore = $queue->numberOfItems();
// 3. For each element of the array, create a new queue item.
foreach ($data as $element) {
// Create new queue item.
$queue->createItem($element);
}
// 4. Get the total of item in the Queue.
$totalItemsAfter = $queue->numberOfItems();
// 5. Get what's in the queue now.
$tableVariables = $this->getItemList($queue);
$finalMessage = $this->t('The Queue had @totalBefore items. We should have added @count items in the Queue. Now the Queue has @totalAfter items.',
[
'@count' => count($data),
'@totalAfter' => $totalItemsAfter,
'@totalBefore' => $totalItemsBefore,
]);
return [
'#type' => 'table',
'#caption' => $finalMessage,
'#header' => $tableVariables['header'],
'#rows' => $tableVariables['rows'],
'#attributes' => $tableVariables['attributes'],
'#sticky' => $tableVariables['sticky'],
'empty' => $this->t('No items.'),
];
}
/**
* Generate an array of objects to simulate getting data from an RSS file.
*
* @return array
* Return an array of data
*/
protected function getFakeData() {
// We should get the XML content and convert it to an array of item objects
// We use now an example array of item object.
$content = [];
for ($i = 1; $i <= 10; $i++) {
$item = new \stdClass();
$item->title = 'Title ' . $i;
$item->body = 'Body ' . $i;
$content[] = $item;
}
return $content;
}
/**
* Generate an array of objects from an external RSS file.
*
* @return array|bool
* Return an array or false
*/
protected function getDataFromRss() {
// 1. Try to get the data form the RSS
// URI of the XML file.
$uri = 'https://www.drupal.org/planet/rss.xml';
// 1. Try to get the data form the RSS.
try {
$response = $this->client->get($uri, ['headers' => ['Accept' => 'text/plain']]);
$data = (string) $response->getBody();
if (empty($data)) {
return FALSE;
}
}
catch (RequestException $e) {
return FALSE;
}
// 2. Retrieve data in a simple xml object.
$data = simplexml_load_string($data);
// 3. Transform in a array of object
// We could transform in array
// $data = json_decode(json_encode($data));
// Look at all children of the channel child.
$content = [];
foreach ($data->children()->children() as $child) {
if (!empty($child->title)) {
// Create an object.
$item = new \stdClass();
$item->title = $child->title->__toString();
$item->body = $child->description->__toString();
// Place the object in an array.
$content[] = $item;
}
}
if (empty($content)) {
return FALSE;
}
return $content;
/*
// Using simplexml_load_file
$xml = simplexml_load_file($uri);
$data = json_decode(json_encode($xml));
ksm($data);
*/
}
/**
* Get all items of queue.
*
* Next place them in an array so we can retrieve them in a table.
*
* @param object $queue
* A queue object.
*
* @return array
* A table array for rendering.
*/
protected function getItemList($queue) {
$retrieved_items = [];
$items = [];
// Claim each item in queue.
while ($item = $queue->claimItem()) {
$retrieved_items[] = [
'data' => [$item->data->title, $item->item_id],
];
// Track item to release the lock.
$items[] = $item;
}
// Release claims on items in queue.
foreach ($items as $item) {
$queue->releaseItem($item);
}
// Put the items in a table array for rendering.
$tableTheme = [
'header' => [$this->t('Title'), $this->t('ID')],
'rows' => $retrieved_items,
'attributes' => [],
'caption' => '',
'colgroups' => [],
'sticky' => TRUE,
'empty' => $this->t('No items.'),
];
return $tableTheme;
}
}
Vous pouvez voir que nous avons injecté par DI trois services: QueueFactory pour travailler avec notre queue, messenger pour afficher des messages et le service http_client pour pouvoir accéder au fichier RSS de Drupal Planet.
Ensuite dans la méthode getData() nous allons:
a- Importer les données depuis un fichier RSS externe
b- Créer la queue et la remplir avec les données externes
c- Récupérer tous les éléments qui se trouvent dans la queue pour les afficher ensuite
Comme vous pouvez le constater, tout cela n'est pas très compliqué.
Si vous avez installé ce module, vous pouvez maintenant visiter la page à cette adresse: /exqueue01/getData et voir comment la queue est crée.
Pour déboguer la queue, vous pouvez le faire avec la commande de Drupal Console drupal debug:queue
, mais cela ne fonctionnera que si vous avez un 'QueueWorker plugin' associé à cette queue.
Vous pouvez aussi réviser la table queue dans la base de données et voir si elle a des données relatives à votre queue.
Dans un prochain post nous verrons comment créer une queue avec un formulaire et avec la fonction hook_cron()
2. Créer un Cron QueueWorker plugin afin de traiter la queue
Comme nous l'avons vu, créer une queue est assez simple en soi. A présent nous devons coder un plugin (un QueueWorker plugin) pour traiter chaque élément qui se trouve dans la queue.
Ce Cron QueueWorker plugin a comme responsabilité d'assurer le traitement de chaque élément qui se trouve dans la queue quand le cron est lancé. Dans notre cas, ce traitement consiste à créer des nodes de type page pour chaque élément.
Voyons à présent ce plugin en détail:
<?php
namespace Drupal\exqueue01\Plugin\QueueWorker;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
/**
* Save queue item in a node.
*
* To process the queue items whenever Cron is run,
* we need a QueueWorker plugin with an annotation witch defines
* to witch queue it applied.
*
* @QueueWorker(
* id = "exqueue_import",
* title = @Translation("Import Content From RSS"),
* cron = {"time" = 5}
* )
*/
class ExQueue01 extends QueueWorkerBase implements ContainerFactoryPluginInterface {
/**
* Drupal\Core\Entity\EntityTypeManagerInterface definition.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
private $entityTypeManager;
/**
* Drupal\Core\Logger\LoggerChannelFactoryInterface definition.
*
* @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
*/
private $loggerChannelFactory;
/**
* {@inheritdoc}
*/
public function __construct(array $configuration,
$plugin_id,
$plugin_definition,
EntityTypeManagerInterface $entityTypeManager,
LoggerChannelFactoryInterface $loggerChannelFactory) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->entityTypeManager = $entityTypeManager;
$this->loggerChannelFactory = $loggerChannelFactory;
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static(
$configuration,
$plugin_id,
$plugin_definition,
$container->get('entity_type.manager'),
$container->get('logger.factory')
);
}
/**
* {@inheritdoc}
*/
public function processItem($item) {
// Save the queue item in a node
// Check the values of $item.
$title = isset($item->title) && $item->title ? $item->title : NULL;
$body = isset($item->body) && $item->body ? $item->body : NULL;
try {
// Check if we have a title and a body.
if (!$title || !$body) {
throw new \Exception('Missing Title or Body');
}
$storage = $this->entityTypeManager->getStorage('node');
$node = $storage->create(
[
'type' => 'page',
'title' => $item->title,
'body' => [
'value' => $item->body,
'format' => 'basic_html',
],
]
);
$node->save();
// Log in the watchdog for debugging purpose.
$this->loggerChannelFactory->get('debug')
->debug('Create node @id from queue %item',
[
'@id' => $node->id(),
'%item' => print_r($item, TRUE),
]);
}
catch (\Exception $e) {
$this->loggerChannelFactory->get('Warning')
->warning('Exception trow for queue @error',
['@error' => $e->getMessage()]);
}
}
}
Tout d'abord, voyons l'annotation de ce plugin.
* @QueueWorker(
* id = "exqueue_import",
* title = @Translation("Import Content From RSS"),
* cron = {"time" = 5}
* )
id = "exqueue_import"
Indique l'identifiant de la queue que nous allons traiter.cron = {"time" = 5}
Indique que ce plugin doit être utilisé par le système cron. La clé 'time' indique que nous allouons 5 secondes pour traiter tous les éléments qui se trouvent dans cette queue. Si nous ne pouvons pas traiter certains éléments dans le temps imparti, ils seront traités au prochain cron.
Mais tout se déroule dans la méthode processItem() de l'interface QueueWorkerInterface. C'est bien dans cette méthode que nous allons traiter chaque élément de la queue. C'est un procédé assez simple. Rien de bien nouveau ici.
Note sur les exceptions de la méthode processQueues()
Il est à noter que lorsque le cron est lancé, la méthode processQueues() de l'objet cron dans le fichier cron.php est à son tour exécutée. Vous pouvez voir le détail de cette méthode ci-dessous.
/**
* Processes cron queues.
*/
protected function processQueues() {
// Grab the defined cron queues.
foreach ($this->queueManager->getDefinitions() as $queue_name => $info) {
if (isset($info['cron'])) {
// Make sure every queue exists. There is no harm in trying to recreate
// an existing queue.
$this->queueFactory->get($queue_name)->createQueue();
$queue_worker = $this->queueManager->createInstance($queue_name);
$end = time() + (isset($info['cron']['time']) ? $info['cron']['time'] : 15);
$queue = $this->queueFactory->get($queue_name);
$lease_time = isset($info['cron']['time']) ?: NULL;
while (time() < $end && ($item = $queue->claimItem($lease_time))) {
try {
$queue_worker->processItem($item->data);
$queue->deleteItem($item);
}
catch (RequeueException $e) {
// The worker requested the task be immediately requeued.
$queue->releaseItem($item);
}
catch (SuspendQueueException $e) {
// If the worker indicates there is a problem with the whole queue,
// release the item and skip to the next queue.
$queue->releaseItem($item);
watchdog_exception('cron', $e);
// Skip to the next queue.
continue 2;
}
catch (\Exception $e) {
// In case of any other kind of exception, log it and leave the item
// in the queue to be processed again later.
watchdog_exception('cron', $e);
}
}
}
}
}
Vous pouvez voir que dans la méthode processQueues(), la méthode processItem() (qui est celle que nous héritons dans notre plugin) se trouve dans un bloque ‘try catch’. Dès lors, si nous lançons une exception depuis notre QueueWorker plugin, elle va être traiter par la méthode processQueues(), l'élément en cours de traitement ne sera pas éliminer et l'exception sera enregistrée dans le log. Cela veut dire que cet élément restera dans la queue si nous lançons une exception depuis la méthode processItem() de notre plugin. C'est un bon modèle car il nous permet d'examiner le log, voir ce qui s'est passé et traiter cet élément plus tard.
Vous pouvez aussi noter que nous pourrions lancer une exception de type RequeueException
ou SuspendQueueException
. Dans le premier cas l'élément traité est replacé dans la queue $queue->releaseItem($item);
et dans le second c'est tout le traitement de la queue qui est suspendu puisque l'on sort du while et du foreach avec l'instruction continue 2;
Nous voyons bien qu'aucunes des exception qui se trouvent dans processQueues() ne nous permettent d'éliminer un élément de la queue si il y a une erreur. Si nous voulons éliminer un élément de la queue si nous trouvons une erreur, nous devrons lancer et récupérer une erreur dans un ‘try catch’ block dans notre 'QueueWorker' plugin. Dans ce cas, nous ne revoyons aucune Exception vers la méthode processQueues() et dès lors notre élément sera éliminer normalement. C'est un choix. Cela veut dire que si nous n'avons pas de titre ou de description, l'élément n'a aucune valeur et nous pouvons l'éliminer de la queue. Cela montre aussi que nous devrions vérifier bien avant, au moment où nous créons la queue, si les éléments doivent être placés ou non dans la queue.
3. Lancer le traitement de la queue
Nous pouvons maintenant lancer le traitement de la queue avec:
cron – C'est le traitement que nous avons choisit par défaut dans l'annotation du QueueWorker plugin. Lors des tests, je n'ai pas pu lancer le traitement avec drush cron
ou drupal cron:excecute
car ces commandes se basent sur les implémentations de hook_cron(). Nous devrons dès lors lancer le cron manuellement depuis l'interface de Drupal: /admin/config/system/cron
Drupal Console – Nous pouvons aussi utiliser la commande de Drupal Console drupal queue:run <name-of-the-queue>
. Dans notre cas ce sera: drupal queue:run exqueue_import
En résumé
Nous avons créé un contrôleur ExQueueController.php
avec une méthode getData().
Nous avons importé les données du fichier RSS de Drupal Planet avec la méthode getDataFromRss() dans un vecteur d'objets.
Nous avons créé une queue grâce à une instance de l'interface QueueInterface avec $this->queueFactory->get('exqueue_import')
.
Nous avons placé chaque élément dans la queue avec la méthode $queue->createItem()
.
Une fois la queue créé, nous avons codé un 'Cron QueueWorker plugin' où nous traitons chaque élément de la queue dans la méthode processItem() pour créer un node sur base de ce dernier.
Voilà, c'est tout pour aujourd'hui. N'hésitez pas à mettre un commentaire si vous avez une question ou un doute.