Documentation Home »Developer Guide »System Components »Message Queue »Architecture »Message Processor
3.0 version
You are currently viewing documentation for version 3.0 which is not a long-term support release. The latest long-term support release is version 2.6

Message Processor

Message processors are components that takes queue messages from Consumers, directly process messages and perform the tasks related to the messages.

To create a message processor you have to:

1. Create a Message Processor Class

The message processor class has to implement \Oro\Component\MessageQueue\Consumption\MessageProcessorInterface:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php
namespace Oro\Component\MessageQueue\Consumption;

use Oro\Component\MessageQueue\Transport\MessageInterface;
use Oro\Component\MessageQueue\Transport\SessionInterface;

interface MessageProcessorInterface
{
   /**
    * Use this constant when the message is processed successfully and the message could be removed from the queue.
    */
   const ACK = 'oro.message_queue.consumption.ack';

   /**
    * Use this constant when the message is not valid or could not be processed
    * The message is removed from the queue
    */
   const REJECT = 'oro.message_queue.consumption.reject';

   /**
    * Use this constant when the message is not valid or could not be processed right now but you can try again later
    * The original message is removed from the queue but a copy is published to the queue again.
    */
   const REQUEUE = 'oro.message_queue.consumption.requeue';

   /**
    * @param MessageInterface $message
    * @param SessionInterface $session
    *
    * @return string
    */
   public function process(MessageInterface $message, SessionInterface $session);
}

The process (MessageInterface $message, SessionInterface $session) method of the MessageProcessorInterface describes the actions that should be performed when a message is received. It can perform the actions directly or create a job. It can also produce a new message to run another processor asynchronously.

The process method has to return a message acknowledgement (self::ACK, self::REJECT or self::REQUEUE) in order to inform the consumer and the message broker about whether the processing of the message has been performed successfully and message can be removed from the queue.

2. Register a Message Processor Service

The message processor class has to be registered as a service with the DI tag oro_message_queue.client.message_processor.

1
2
3
4
acme.async.message_processor:
    class: 'Acme\Bundle\DemoBundle\Async\MessageProcessor'
    tags:
        - { name: 'oro_message_queue.client.message_processor' }

3. Subscribe the Message Processor to the Message Topic

In order for the message processor to receive messages, it should be subscribed for the messages in a message topic (one or more).

The subscription to the message topic can be performed in two ways:

  • The message processor can implement \Oro\Component\MessageQueueClient\TopicSubscriberInterface
  • You can specify the message topic in the oro_message_queue.client.message_processor tag when registering the message processor service

The following examples are equal:

  1. The message processor implements \Oro\Component\MessageQueue\Client\TopicSubscriberInterface:

    1
    2
    3
    4
    acme.async.message_processor:
        class: 'Acme\Bundle\DemoBundle\Async\MessageProcessor'
        tags:
            - { name: 'oro_message_queue.client.message_processor' }
    
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    # src/Acme/Bundle/DemoBundle/Async/MessageProcessor
    <?php
    
    namespace App\Acme\Bundle\DemoBundle\Async;
    
    use App\Acme\Bundle\DemoBundle\Async\Topics;
    use \Oro\Component\MessageQueue\Client\TopicSubscriberInterface;
    
    class MessageProcessor implements MessageProcessorInterface, TopicSubscriberInterface
    {
        // ...
    
        /**
         * @param MessageInterface $message
         * @param SessionInterface $session
         *
         * @return string
         */
         public function process(MessageInterface $message, SessionInterface $session)
         {
             ...
         }
    
         /**
          * * ['topicName']
          * * ['topicName' => ['processorName' => 'processor', 'destinationName' => 'destination']]
          * processorName, destinationName - optional.
          *
          * @return array
          */
         public static function getSubscribedTopics()
         {
             return [Topics::GENERATE_MONTHLY_REPORT];
         }
    }
    
  2. The message topic specified in the DI tag:

    1
    2
    3
    4
    acme.async.message_processor:
        class: 'Acme\Bundle\DemoBundle\Async\MessageProcessor'
        tags:
            - { name: 'oro_message_queue.client.message_processor', topicName: 'acme_generate_monthly_report' }
    
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # src/Acme/Bundle/DemoBundle/Async/MessageProcessor
    <?php
    
    namespace App\Acme\Bundle\DemoBundle\Async;
    
    use App\Acme\Bundle\DemoBundle\Async\Topics;
    
    class MessageProcessor implements MessageProcessorInterface
    {
        // ...
    
        /**
         * @param MessageInterface $message
         * @param SessionInterface $session
         *
         * @return string
         */
         public function process(MessageInterface $message, SessionInterface $session)
         {
             ...
         }
    }
    

List All Declared Message Processors

To list all declared message processors along with message topics that they are subscribed to, use the following command:

./bin/console oro:message-queue:topics

You will be redirected to [title]. Would you like to continue?

Yes No
ssossossossosso