Notification subscriber server

Registered by Mehdi Abaakouk

This cover a needs for ceilometer to migrate to oslo.messaging

The idea is have a simple API to subscribe to raw notification message. The API will looks like the RPC server one.
To share the code of the MessageHandlingServer and executor stuffs.

The target must allow to acknowledge/requeue message manually.

Example of usage:

    class Endpoint(object):
        def warn(self, ctxt, publisher_id, event_type, payload, extras):
            do_somehting(payload) or \
                    raise oslo.messaging.RequeueMessageException()

    target = messaging.Target(topic='notification', exchange='cinder')
    listener = notify.get_notification_listener(transport, [target],
                                                endpoints,
                                                executor,
                                                serializer)

Blueprint information

Status:
Complete
Approver:
Mark McLoughlin
Priority:
High
Drafter:
Mehdi Abaakouk
Direction:
Approved
Assignee:
Mehdi Abaakouk
Definition:
Approved
Series goal:
Accepted for icehouse
Implementation:
Implemented
Milestone target:
milestone icon 1.3.0
Started by
Mark McLoughlin
Completed by
Doug Hellmann

Related branches

Sprints

Whiteboard

Gerrit topic: https://review.openstack.org/#q,topic:bp/notification-subscriber-server,n,z

Addressed by: https://review.openstack.org/57275
    Implementation of notification subscriber server

---

Thanks for this! Before I really dig into reviewing, I sketched out what my default approach to the API would have been. That's not to say mine is better, it's just my starting point before even looking at your code:

  from oslo.config import cfg
  from oslo import messaging

  transport = messaging.get_transport(cfg.CONF)

  class InfoHandler(object):

      def info(self, ctxt, publisher_id, event_type, payload):
          ...

  class WarnHandler(object):

      def warn(self, ctxt, publisher_id, event_type, payload):
          ...

  endpoints = [
      InfoHandler(),
      WarnHandler(),
  ]

  target = messaging.Target(exchange='nova',
                            topic='notifications')

  listener = messaging.get_notification_listener(transport, target, endpoints,
                                                 executor='eventlet',
                                                 serializer=...)

-- @markmc

--

Note also, I think we might benefit in the longer term from a type of declarative filter that different transport drivers could implement cleverly - i.e. rather than doing:

  class WarnHandler(object):

      def warn(self, ctxt, publisher_id, event_type, payload):
          if not publisher_id.startswith('compute'):
              return

you'd instead do something like:

  class WarnHandler(object):

      filter = NotificationsFilter(publisher_id='^compute.*')

      def warn(self, ctxt, publisher_id, event_type, payload):
          ...

but we'd really need a driver that would do something useful with it before implementing it. The tricky part is that if we based it on regular expressions, you probably limit what kind of drivers you can write. But if you don't base it on regular expressions, you limit what you can express in the filter.

-- @markmc

Addressed by: https://review.openstack.org/59712
    Remove the partial implementation of ack_on_error

Addressed by: https://review.openstack.org/59713
    Abstract the ack/requeue/reject layer of a message

Addressed by: https://review.openstack.org/59714
    Do the acknowlegement of messages by the executor

Addressed by: https://review.openstack.org/59715
    Allow fake driver to consume multiple topics

Addressed by: https://review.openstack.org/59716
    Add a listener abstraction layer for notification

Gerrit topic: https://review.openstack.org/#q,topic:bug/1241566,n,z

---

The blueprint description could do with being updated to reflect the new API -- @markmc

Addressed by: https://review.openstack.org/61674
    Make the executor responsible of the message ack

Addressed by: https://review.openstack.org/61675
    Allow to requeue the notification message

Addressed by: https://review.openstack.org/62931
    Make the dispatcher responsible to listen()

Addressed by: https://review.openstack.org/70106
    Add multiple exchange per listerner in fake driver

Gerrit topic: https://review.openstack.org/#q,topic:bug/1282639,n,z

Addressed by: https://review.openstack.org/77136
    Allow missing data notification endpoint callback

Addressed by: https://review.openstack.org/75365
    Add transport reconnection retries

----

The remaining open review (https://review.openstack.org/#/c/75365/9) is related to a bug. - dhellmann

(?)

Work Items

Dependency tree

* Blueprints in grey have been implemented.

This blueprint contains Public information 
Everyone can see this information.