You are viewing an old version of this page. View the current version.
Compare with Current View Page History
« Previous Version 2 Next »
Key Terms
Message broker - a system component the main function of which is to keep and share messages among other system components.
Producer - a program or a system module sending a message into a Queue (producing a message).
Consumer - a program or a system module receiving a message from a Queue and processing it.
Queue - a part of a Message Broker; a list implemented on the FIFO basis (First In First Out). It is intended to keep messages of similar type and allocate them among responsible Consumers.
Exchange - a part of a Message Broker responsible for the message allocation among Queues.
Routing Key - a key that is needed for message allocation among Queues.
Binding - a connection between Exchanges or between an Exchange and a Queue.
Message Broker operation modes
Message broker can operate with instant messages and delayed messages. The difference is that delayed messages are sent after some time specified in the $delay parameter.
Instant messages processing
Instant messages processing consists of several stages:
- Message sending
- Message allocation
- Message storage in queue
- Message processing
Message sending
In brief, the component (program or module) producing a message (a Producer) generates a message and enhances it with a service data (such as Routing Key, User ID, Sys_ID referenced to the User Token (user_token) table). Before sending, the message is serialized.
Sending example:
$this->producer->publish($message = ‘message’, $routingKey = ‘echo’);
Message allocation
The message gets into the Exchange; for allocating messages among Queues, a Routing Key is used specified on the previous stage.
An Exchange is binded (in other words, it has a Binding) to every Queue. If a message contains a Routing Key not matching any queue it goes away and does not move to the next stage.
Allocation example:
- Created an Exchange “backend.asyncjob.dx”
- Created a Queue “backend_echo_q” is created
- Then created a Binding between the Exchange and the Queue. They are bound by a 'echo' Routing Key.
Message storage in queue
After messages are queued up, then they should be processed.
The queue listens to the specified Consumer state (it is based on the Routing Key). When the Consumer can get the message, then it happens. If the queue has no consumers, then the messages line up in queue but do not disappear.
By default, every queue has at least one consumer. If queue has more than one consumer bound, then it allocates messages among them.
Message processing
Consumer is implemented as a background process running in a separate Docker container. It runs a shell app within the container; after that, message is processed by a back-end interface with a specific classes which are chosen based on the Routing Key specified in the message.
Before processing, a consumer should perform user (or impersonator) authorization which were the message sending initiators; necessary authorization information can be found in every message.
Обработка отложенных сообщений.
- Отправка сообщений
Реализована возможность добавления параметра $delay, (что указывается? задержка в мс? сек?) который уведомляет о необходимости создания отложенного сообщения (как администратор/девелопер добавит этот параметр? куда? используя наше API? какие классы и методы используются?)
2. Распределение сообщений
- добавлен Обменник “backend.delayedjob.dmx” типа x-delayed-message (это что за тип?)
- сообщения с параметром $delay попадают в Обменник “backend.delayedjob.dmx”
- “backend.delayedjob.dmx” содержит привязку на “backend.asyncjob.dx”
Дальнейшие шаги (сообщения в очереди и обработка сообщения) аналогичны обработке мгновенных сообщений?
Настройка окружения
Контейнер rabbit-mq
- содержит запущенный брокер сообщений RabbitMQ
- доступен снаружи на портах
5672 - по TCP протоколу (используется AMQP клиентами)
15672 - по HTTP протоколу (для доступа к административной панели) - подключен плагин rabbitmq_delayed_message_exchange, позволяющий создавать отложенные сообщения
Контейнер backend
Supervisord запускает необходимое кол-во воркеров (указанное в переменной окружения) и следит за их кол-вом. В случае когда один из процессов обрывается, supervisor запускает новый процесс.
Нужна инструкция по настройке всего этого дела на платформе (Мухин?)
Таблица sys_queue_message (путь в навигаторе?)
Field | Description |
---|---|
Message | Само сообщение. Обязательное поле. |
Routing key | Ключ маршрутизации сообщения. Обязательное поле. |
Scheduled start time | Если сообщение отложенное, то содержит предполагаемую дату и время запуска. Если сообщение мгновенное, то содержит null. |
Start time | Время начала обработки |
End time | Время завершения обработки |
State | Статус сообщения. Возможные варианты:
|
Log Exception ID | Содержит ID залогированного исключения, в случае, если было выброшено исключение. Если исключения не было, то содержит NULL. |
Memory usage | Объем памяти, потребовавшийся на обработку сообщения (так ли это?) |
- по истечению времени, заданного в $delay, сообщение отправляется в Обменник “backend.asyncjob.dx” с тем же Ключом маршрутизации
Брокер сообщений может работать в двух режимах:
Обработка мгновенных сообщений
- Отправка сообщения.
- Сообщением может быть любой тип данных (что это значит?)
- Узел, являющийся Producer, генерирует сообщение, дополняет его служебной информацией (routingKey, userId, sys_id из user_token)
- Перед отправкой сообщение сериализуется (т.е. сообщение переводится в последовательность битов)
- - пример отправки:
$this->producer->publish($message = ‘message’, $routingKey = ‘echo’);
2. Распределение сообщений:
Сообщение попадает в обменник (exchange)
При отправке указывается routing key для распределения сообщений между queues
Exchange "“backend.asyncjob.dx”" имеет Binding на каждую из Queues (на одну или несколько?)
Если сообщение содержит Ключ маршрутизации, не соответствующий ни одной привязке, сообщение пропадает
- пример настройки:
создан Обменник с именем “backend.asyncjob.dx”
создана Очередь с именем “backend_echo_q”
создана Привязка между Обменником “backend.asyncjob.dx” и “backend_echo_q” по Ключу маршрутизации “echo”
3. Messages в queue
- сообщения, попав в queue, ожидают обработки
- при освобождении consumer, queue отдает ему message (какое правило должно соблюстись? как очередь узнает, что это именно тот консумер, который должен получить сообщение)
- если у Очереди нет потребителей, сообщения накапливаются (после перезагрузки контейнера сообщения не должны пропадать) (это реализовано?)
- по-умолчанию для каждой Очереди создается хотя бы один потребитель (как производится привязка?) (как создаются очереди?)
- если потребителей несколько, Очередь распределяет сообщения между ними (по какому принципу?)
4. Обработка сообщений
- в начальной версии нет разделения потребителей по Очередям. Каждый потребитель способен обработать любое известное сообщение
- потребитель реализуется в виде постоянно запущенного фонового процесса в отдельном docker контейнере
- фоновый процесс запускает консольное приложение из backend контейнера
- обработка сообщения происходит классами, реализующими интерфейс Queue\Domain\Contract\ConsumerInterface
- выбор класса для обработки сообщения определяется с помощью Ключа маршрутизации
- если процесс упал, он должен автоматически перезапуститься
- потребитель, перед обработкой сообщения, авторизует пользователя (и имперсонатора, если такой есть), необходимая информация должна присутствовать в каждом сообщении
- No labels