vendor/pimcore/portal-engine/src/EventSubscriber/BatchTaskSubscriber.php line 102

Open in your IDE?
  1. <?php
  2. /**
  3.  * Pimcore
  4.  *
  5.  * This source file is available under following license:
  6.  * - Pimcore Commercial License (PCL)
  7.  *
  8.  *  @copyright  Copyright (c) Pimcore GmbH (http://www.pimcore.org)
  9.  *  @license    http://www.pimcore.org/license     PCL
  10.  */
  11. namespace Pimcore\Bundle\PortalEngineBundle\EventSubscriber;
  12. use Pimcore\Bundle\PortalEngineBundle\Entity\BatchTask;
  13. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\BatchTaskMessageInterface;
  14. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\SequentialBatchTaskMessageInterface;
  15. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\SplittedBatchTaskMessageInterface;
  16. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\TriggerFinishedMessageBatchTaskMessageInterface;
  17. use Pimcore\Bundle\PortalEngineBundle\Service\BatchTask\BatchTaskService;
  18. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  19. use Symfony\Component\HttpKernel\Event\TerminateEvent;
  20. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  21. use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
  22. use Symfony\Component\Messenger\MessageBusInterface;
  23. /**
  24.  * Class IndexUpdateListener
  25.  *
  26.  * @package Pimcore\Bundle\PortalEngineBundle\EventListener
  27.  */
  28. class BatchTaskSubscriber implements EventSubscriberInterface
  29. {
  30.     /**
  31.      * @var BatchTaskService
  32.      */
  33.     protected $batchTaskService;
  34.     /**
  35.      * @var MessageBusInterface
  36.      */
  37.     protected $messageBus;
  38.     /**
  39.      * @var BatchTask
  40.      */
  41.     protected $terminateBatchTask;
  42.     /**
  43.      * @param BatchTaskService $batchTaskService
  44.      * @param MessageBusInterface $messengerBusPortalEngine
  45.      */
  46.     public function __construct(BatchTaskService $batchTaskServiceMessageBusInterface $messengerBusPortalEngine)
  47.     {
  48.         $this->batchTaskService $batchTaskService;
  49.         $this->messageBus $messengerBusPortalEngine;
  50.     }
  51.     /**
  52.      * @return array
  53.      */
  54.     public static function getSubscribedEvents()
  55.     {
  56.         return [
  57.             WorkerMessageFailedEvent::class => 'onBatchTaskMessageFailed',
  58.             WorkerMessageHandledEvent::class => 'onWorkerMessageHandled',
  59.             TerminateEvent::class => 'onTerminate',
  60.         ];
  61.     }
  62.     /**
  63.      * Mark batch tasks with failed items as finished as otherwise they will run forever.
  64.      *
  65.      * @param WorkerMessageFailedEvent $event
  66.      *
  67.      * @throws \Doctrine\DBAL\DBALException
  68.      */
  69.     public function onBatchTaskMessageFailed(WorkerMessageFailedEvent $event)
  70.     {
  71.         $message $event->getEnvelope()->getMessage();
  72.         if (!$message instanceof BatchTaskMessageInterface) {
  73.             return;
  74.         }
  75.         if ($event->willRetry()) {
  76.             return;
  77.         }
  78.         if (!$batchTask $this->batchTaskService->getTaskById($message->getTaskId())) {
  79.             return;
  80.         }
  81.         foreach (array_keys($message->getItems()) as $itemIndex) {
  82.             if (!$this->batchTaskService->isItemIndexProcessed($batchTask$itemIndex)) {
  83.                 $this->batchTaskService->markItemIndexAsProcessed($batchTask$itemIndex);
  84.             }
  85.         }
  86.         $this->checkBatchTaskFinished($batchTask$message);
  87.     }
  88.     public function onWorkerMessageHandled(WorkerMessageHandledEvent $event)
  89.     {
  90.         $message $event->getEnvelope()->getMessage();
  91.         if (!$message instanceof BatchTaskMessageInterface) {
  92.             return;
  93.         }
  94.         if ($message instanceof SplittedBatchTaskMessageInterface) {
  95.             return;
  96.         }
  97.         if (!$batchTask $this->batchTaskService->getTaskById($message->getTaskId())) {
  98.             return;
  99.         }
  100.         if ($message instanceof SequentialBatchTaskMessageInterface && $message->hasRemainingItems()) {
  101.             $remainingMessage $message->createRemainingMessage($this->batchTaskService);
  102.             $this->messageBus->dispatch($remainingMessage);
  103.             return;
  104.         }
  105.         $this->checkBatchTaskFinished($batchTask$message);
  106.     }
  107.     public function onTerminate(TerminateEvent $event)
  108.     {
  109.         $this->batchTaskService->terminateBatchTask();
  110.     }
  111.     public function setTerminateBatchTask(BatchTask $batchTask)
  112.     {
  113.         $this->terminateBatchTask $batchTask;
  114.     }
  115.     protected function checkBatchTaskFinished(BatchTask $batchTaskBatchTaskMessageInterface $message)
  116.     {
  117.         $this->batchTaskService->checkBatchTaskFinished($batchTask);
  118.         if ($message instanceof TriggerFinishedMessageBatchTaskMessageInterface) {
  119.             $finishedMessage $message->createFinishedMessage();
  120.             $this->messageBus->dispatch($finishedMessage);
  121.         }
  122.     }
  123. }