Guaranteed notification delivery

Registered by Artem Lapin on 2019-07-05

For mistral notifications it will be good to have plugin, that guarantees notifies delivery and right order and store undelivered notifies in RabbitMQ queue (for example).

If guaranteed notifier enabled on Mistral start we create new RabbitMQ virtual host (mistral_guaranteed_notifies) and give Mistral RabbitMQ user management rights.

If guaranteed notifier enabled, engine creates persisitent RabbitMQ queue for each pair workflow_execution_id/url on wf_execution start method, if workflow reruns engine create this queue again
(if we use RabbitMQ queue creation command just return queue if it already exist). To prevent errors with 255 symbols limit of queue name in RabbitMQ we use md5 hashed with key url.

When workflow or task calls method notify on guaranteed notifier, Engine create new RabbitMQ connection and channel and publish notification in each workflow_execution_id/url queue,
notification also contains url, headers, polling time and restart count.
Engine does not use oslo or kombu rpc client for this call and uses pika library and RabbitMQ credentials and hostname stored in config.
If guaranteed notifies enabled for this wf_execution engine check workflow state after every notify. If workflow in cancelled, completed or error state engine publish "end" message to queue.

New guaranteed notifier pod get list of RabbitMQ queues via REST api every 0.1 second (for example) from mistral_guaranteed_notifies virtual host.
Every worker thread in this pod get one queue from this list and start processing it. Worker's use hazelcast and python thread locks and it guarantees that one worker process one queue.
Worker pre-fetch first message from queue and try to deliver it, if message delivery successed worker sends basic_ack to RabbitMQ channel, and this message is removed from the queue,
worker starts processing next message.
If message delivery failed worker sends basic_reject (requeue=True) to RabbitMQ channel and this message will not be removed. Worker adds this url (url hashed with md5 with same key from engine)
to unavaliable urls list and starts processing next queue.
If worker get end message, it delete queue if the queue does not contain other messages, if queue contains messages after "end" (signal that there was queue rerun) worker ignores "end".
Workers ignore queues with unavaliable urls, if user want to redeliver messages to this urls he should call guaranteed_notifier REST endpoint with post request, which contains url,
notifier delete this url from unavaliable urls list. If guaranteed_notifier pod become unavaliable due to unexpected error all undelivered messages will be safe in RabbitMQ persistent queue,
this messages can be delivered later. This logic will guarante at least one delivery of each message, because if notifier pod die, when message delivery still in progress this message will not be deleted from
queue, because queue will not get basic_ack, if message was delivered before pod death and RabbitMQ didin't get basic_ack, message will not be deleted from queue and will be delivered second time.

Blueprint information

Not started
Artem Lapin
Needs approval
Series goal:
Milestone target:

Related branches




Work Items

This blueprint contains Public information 
Everyone can see this information.


No subscribers.