Notification subscriber server

Registered by Mehdi Abaakouk

This cover a needs for ceilometer to migrate to oslo.messaging

The idea is have a simple API to subscribe to raw notification message. The API will looks like the RPC server one.
To share the code of the MessageHandlingServer and executor stuffs.

The target must allow to acknowledge/requeue message manually.

Example of usage:

    class Endpoint(object):
        def warn(self, ctxt, publisher_id, event_type, payload, extras):
            do_somehting(payload) or \
                    raise oslo.messaging.RequeueMessageException()

    target = messaging.Target(topic='notification', exchange='cinder')
    listener = notify.get_notification_listener(transport, [target],

Blueprint information

Mark McLoughlin
Mehdi Abaakouk
Mehdi Abaakouk
Series goal:
Accepted for icehouse
Milestone target:
milestone icon 1.3.0
Started by
Mark McLoughlin
Completed by
Doug Hellmann

Related branches



Gerrit topic:,topic:bp/notification-subscriber-server,n,z

Addressed by:
    Implementation of notification subscriber server


Thanks for this! Before I really dig into reviewing, I sketched out what my default approach to the API would have been. That's not to say mine is better, it's just my starting point before even looking at your code:

  from oslo.config import cfg
  from oslo import messaging

  transport = messaging.get_transport(cfg.CONF)

  class InfoHandler(object):

      def info(self, ctxt, publisher_id, event_type, payload):

  class WarnHandler(object):

      def warn(self, ctxt, publisher_id, event_type, payload):

  endpoints = [

  target = messaging.Target(exchange='nova',

  listener = messaging.get_notification_listener(transport, target, endpoints,

-- @markmc


Note also, I think we might benefit in the longer term from a type of declarative filter that different transport drivers could implement cleverly - i.e. rather than doing:

  class WarnHandler(object):

      def warn(self, ctxt, publisher_id, event_type, payload):
          if not publisher_id.startswith('compute'):

you'd instead do something like:

  class WarnHandler(object):

      filter = NotificationsFilter(publisher_id='^compute.*')

      def warn(self, ctxt, publisher_id, event_type, payload):

but we'd really need a driver that would do something useful with it before implementing it. The tricky part is that if we based it on regular expressions, you probably limit what kind of drivers you can write. But if you don't base it on regular expressions, you limit what you can express in the filter.

-- @markmc

Addressed by:
    Remove the partial implementation of ack_on_error

Addressed by:
    Abstract the ack/requeue/reject layer of a message

Addressed by:
    Do the acknowlegement of messages by the executor

Addressed by:
    Allow fake driver to consume multiple topics

Addressed by:
    Add a listener abstraction layer for notification

Gerrit topic:,topic:bug/1241566,n,z


The blueprint description could do with being updated to reflect the new API -- @markmc

Addressed by:
    Make the executor responsible of the message ack

Addressed by:
    Allow to requeue the notification message

Addressed by:
    Make the dispatcher responsible to listen()

Addressed by:
    Add multiple exchange per listerner in fake driver

Gerrit topic:,topic:bug/1282639,n,z

Addressed by:
    Allow missing data notification endpoint callback

Addressed by:
    Add transport reconnection retries


The remaining open review ( is related to a bug. - dhellmann


Work Items

Dependency tree

* Blueprints in grey have been implemented.

This blueprint contains Public information 
Everyone can see this information.