Skip to content

Callback mechanism for listening to Pub/Sub messages #1791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
BrandonY opened this issue May 11, 2016 · 7 comments
Closed

Callback mechanism for listening to Pub/Sub messages #1791

BrandonY opened this issue May 11, 2016 · 7 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@BrandonY
Copy link

I'd love for there to be a mechanism to set up perpetual pulling and acknowledging of Cloud Pub/Sub messages, the sort of thing that might enable code like this:

# Messages are acknowledged unless DoWork raises an exception
def DoWork(message):
    print "Message payload: %s" % message.payload

worker_thread = subscription.pull(callback=DoWork)
@theacodes
Copy link
Contributor

This would be pretty interesting, but there's lots of questions there on how that happens. Is it a thread? another process? gevent? asyncio?

In the meantime, GoogleCloudPlatform/psq is a rather high-level layer over Pub/Sub that can handle a similar use case: polling for messages and executing functions.

@BrandonY
Copy link
Author

psq is pretty cool, but I believe it's a bit specialized into the niche of low configuration task queue. For example, I was under the impression that it only worked for topics that began with the name "psq-" and assumed the existence of a shared subscription on that queue.

Supporting callbacks on some arbitrary subscription would either be a feature request on gcloud-pubsub for callbacks or a feature request on psq for supporting a single, arbitrary subscription. I could file the latter, if you'd prefer not to add callbacks on this side, but I think I'd prefer it be added to gcloud-pubsub. It seems like a thread that continually polled for messages and dispatched them to a handler would be pretty much universally useful to gcloud-pubsub users.

@theacodes
Copy link
Contributor

So, Pythonically I don't need much benefit over:

while True:
   messages = subscription.pull(return_immediately=False)
   # Do stuff with messages

vs

def do_stuff_with_messages(messages):
    # ....

subscription.pull(callback=do_stuff_with_messages)

Callbacks in Python are usually reserved for asynchronous programming. It seems in your FR that you expected calling subscription.pull(callback=DoWork) to not block. Unfortunately, that gets into the realm of async and there there are a multitude of ways to do that in Python and it would seem unfair for us to choose one way and limit our users.

For example, if someone wanted to use gevent to do what you said, they could easily implement that via:

def worker_thread(subscription, callback):
    while True:
        messages = subscription.pull()
        for message in messages:
            callback(message)


thread = gevent.spawn(worker_thread, subscription, some_callback)

@dhermes or @tseaver might have some different ideas here.

@BrandonY
Copy link
Author

BrandonY commented May 12, 2016

When you add in the need to acknowledge messages, I think the below is pretty much the simplest way to use the pull function:

while True:
  messages = subscription.pull(return_immediately=False, max_messages=10)
  messages_to_ack = []
  for message in messages:
    try:
      # // do stuff
      messages_to_ack.append(message.message_id)
    except:
      # //whoops, only add to message_to_ack if non-retryable error
  subscription.acknowledge(messages_to_ack)

That's still pretty easy to write, but it's also pretty easy to get wrong, so I still like the idea of gcloud-pubsub offering it directly.

@tseaver tseaver added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. api: pubsub Issues related to the Pub/Sub API. labels May 12, 2016
@tseaver
Copy link
Contributor

tseaver commented May 12, 2016

FWIW, PR's #1636 and #1637 offer competing strategies for doing "auto-acknowlegement" of message IDs.

@BrandonY
Copy link
Author

Ah, very interesting, thanks @tseaver

@tseaver
Copy link
Contributor

tseaver commented Aug 25, 2016

And FTR we merged the implementation in #1636, making your example above look like:

while True:
    with subscription.auto_ack() as ack:
        for ack_id, message in list(ack.items()):
            try:
                do_something_with(message)
            except WhateverException:
                del ack[ack_id]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

4 participants