Notification subscriber server

Registered by Mehdi Abaakouk

This cover a needs for ceilometer to migrate to oslo.messaging

The idea is have a simple API to subscribe to raw notification message. The API will looks like the RPC server one.
To share the code of the MessageHandlingServer and executor stuffs.

The target must allow to acknowledge/requeue message manually.

Example of usage:

    class Endpoint(object):
        def warn(self, ctxt, publisher_id, event_type, payload, extras):
            do_somehting(payload) or \
                    raise oslo.messaging.RequeueMessageException()

    target = messaging.Target(topic='notification', exchange='cinder')
    listener = notify.get_notification_listener(transport, [target],

Blueprint information

Mark McLoughlin
Mehdi Abaakouk
Mehdi Abaakouk
Series goal:
Accepted for icehouse
Milestone target:
milestone icon 1.3.0
Started by
Mark McLoughlin
Completed by
Doug Hellmann

Thanks for this! Before I really dig into reviewing, I sketched out what my default approach to the API would have been. That's not to say mine is better, it's just my starting point before even looking at your code:

  from oslo.config import cfg
  from oslo import messaging

  transport = messaging.get_transport(cfg.CONF)

  class InfoHandler(object):

      def info(self, ctxt, publisher_id, event_type, payload):

  class WarnHandler(object):

      def warn(self, ctxt, publisher_id, event_type, payload):

  endpoints = [

  target = messaging.Target(exchange='nova',

  listener = messaging.get_notification_listener(transport, target, endpoints,

-- @markmc


Note also, I think we might benefit in the longer term from a type of declarative filter that different transport drivers could implement cleverly - i.e. rather than doing:

  class WarnHandler(object):

      def warn(self, ctxt, publisher_id, event_type, payload):
          if not publisher_id.startswith('compute'):

you'd instead do something like:

  class WarnHandler(object):

      filter = NotificationsFilter(publisher_id='^compute.*')

      def warn(self, ctxt, publisher_id, event_type, payload):

but we'd really need a driver that would do something useful with it before implementing it. The tricky part is that if we based it on regular expressions, you probably limit what kind of drivers you can write. But if you don't base it on regular expressions, you limit what you can express in the filter.

-- @markmc

The remaining open review ( is related to a bug. - dhellmann


