Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Merged branch "DOC0000362" into parent

Message broker is a set of components that allow supporting message exchange among other system components. This functionality allows you to handle processing transactions in real-time, such as notification processing and importing. Also, message queues are used when you are starting a Scheduled Script.

Key Terms


Message broker - a system component, the main function of which is to keep and share messages among other system components.

Producer - a program or a system module sending a message into a the Queue (producing a message).

Consumer - a program or a system module receiving that receives a message from a the Queue and processing processes it.

Queue -  – a part of a Message Broker; a  a list implemented on the FIFO basis (First In First Out). It is intended to keep messages of similar type and allocate them among responsible Consumers.

Exchange -  – a part of a Message Broker responsible for the message allocation among Queues.

Routing Key -  – a key that is needed for message allocation among Queues.

Binding -  – a connection between Exchanges or between an the Exchange and a the Queue.


Message Broker

operation

operating modes


Message broker can operate with instant messages and delayed messages. The difference is that delayed messages are sent after some time specified in the $delay parameter.

Instant

messages

message processing


Instant messages message processing consists of several stages:

  1. Message sending
  2. Message allocationMessage storage in queue
  3. Message processing.

Message sending

In brief, the component (program or module) producing a message (a Producer) generates a message and enhances it with a service data (such as Routing Key, User ID, Sys_ID referenced to the User Token (user_token) table). Before sending, the message is serialized.

Sending example:

Code Block
languagejs
themeEclipse
linenumberstrue
$this->producer->publish($message = ‘message’'message', $routingKey = ‘echo’);

Message allocation

The message gets into the Exchange; for allocating messages among Queues, a Routing Key is used specified on the previous stage.

An Exchange is binded (in other words, it has a Binding) to every Queue. If a message contains a Routing Key not matching any queue it goes away and does not move to the next stage.

Allocation example:

  • Created an Exchange “backend.asyncjob.dx”
  • Created a Queue “backend_echo_q” is created
  • Then created a Binding between the Exchange and the Queue. They are bound by a 'echo' Routing Key.
    'echo');

    Message storage in queue

    After Once messages are queued up, then they should then be processed.

    The queue listens to the specified Consumer state (it is based on the Routing Key). When If the Consumer can get gets the message, then it happens. If the queue has no consumers, then the messages line up in the queue but do not disappear.

    By default, every queue has at least one consumer. If a queue has more than one consumer bound, then it allocates messages among them.

    Message processing

    Consumer is implemented as a background process running in a separate Docker container. It runs a shell app within the container; after that, the message is processed by a back-end interface with a specific classes class, which are is chosen based on the Routing Key specified in the message.

    Before processing, a consumer should perform user (or impersonator) authorization, which were is the message sending initiatorsinitiator; necessary authorization information can be found in every message.

    Delayed messages processing


    In general, delayed messages message processing is similar to instant messages message processing; the . The difference is that the $delay parameter responsible for delay specifying is implemented, allowing that this message should to be sent with a delay specified.

    Message allocation

    Allocation example:

  • Created an Exchange “backend.delayedjob.dmx” of the x-delayed-message type
  • Messages with the $delay parameter

    specified

    get into this exchange

    .

  • This exchange is bound to the "backend.asyncjob.dx" exchange which is not of the x-delayed-message type and intended for processing instant messages processing.
  • Further stages are similar to the instant messages processing.

    Logging


    All message exchanging activities are logged and can be found in the Queue Messages (sys_queue_message) table.

    Queue Message fields description

    FieldDescriptionMessageContent of the message sending. This field is mandatory.Routing KeyMessage routing key. This field is mandatory.Scheduled start timeRepresents date and time of suggested message processing start (based on the value specified in the $delay parameter); otherwise, contains NULL.Start timeContains a date and time when the message processing starts.End timeContains a date and time when the message processing endsState

    Message processing state. Available options:

    • Pending
    • In process
    • Completed
    • Error
    Log Exception IDThis field displays the ID of the thrown log exception if there was one; otherwise, contains NULL.Memory usageIn there, total memory amount for message processing is displayed.

     to the standard output (Stdout).

    Environment settings


    Message broker system consists of two containers.:

    1. Within the first container, the
    RabbitMQ
    1. message broker is running
    . External access is
    1. externally and available via TCP
    (port 5672, used by AMQP clients) and HTTP (port 15672, to access the admin panel)
    1. . Also,
    the
    1. a specific
     plugin
    1. plugin is configured there
    for creating
    1. to create delayed messages.
    2. Within the second container, the supervisor daemon runs the required workers amount (can be set by an environment variable).
    In the case of
    1. If one of
    this
    1. these processes fails, the supervisor starts the new one.

    Table of Contents
    absoluteUrltrue
    classfixedPosition