feat: make publish futures compatible with concurrent.futures.as_completed() by plamut · Pull Request #397 · googleapis/python-pubsub · GitHub
Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 16 additions & 144 deletions google/cloud/pubsub_v1/futures.py
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def publish(self, message):

# Track the future on this batch (so that the result of the
# future can be set).
future = futures.Future(completed=threading.Event())
future = futures.Future()
self._futures.append(future)

# Try to commit, but it must be **without** the lock held, since
Expand Down
22 changes: 15 additions & 7 deletions google/cloud/pubsub_v1/publisher/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ class Future(futures.Future):
ID, unless an error occurs.
"""

def cancel(self):
"""Actions in Pub/Sub generally may not be canceled.

This method always returns ``False``.
"""
return False

def cancelled(self):
"""Actions in Pub/Sub generally may not be canceled.

This method always returns ``False``.
"""
return False

def result(self, timeout=None):
"""Return the message ID or raise an exception.

Expand All @@ -43,10 +57,4 @@ def result(self, timeout=None):
Exception: For undefined exceptions in the underlying
call execution.
"""
# Attempt to get the exception if there is one.
# If there is not one, then we know everything worked, and we can
# return an appropriate value.
err = self.exception(timeout=timeout)
if err is None:
return self._result
raise err
return super().result(timeout=timeout)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this override?

14 changes: 8 additions & 6 deletions google/cloud/pubsub_v1/subscriber/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class StreamingPullFuture(futures.Future):

def __init__(self, manager):
super(StreamingPullFuture, self).__init__()
self._manager = manager
self._manager.add_close_callback(self._on_close_callback)
self._cancelled = False
self.__manager = manager
self.__manager.add_close_callback(self._on_close_callback)
self.__cancelled = False

def _on_close_callback(self, manager, result):
if self.done():
Expand All @@ -47,12 +47,14 @@ def cancel(self):
"""Stops pulling messages and shutdowns the background thread consuming
messages.
"""
self._cancelled = True
return self._manager.close()
# NOTE: We circumvent the base future's self._state to track the cancellation
# state, as this state has different meaning with streaming pull futures.
self.__cancelled = True
return self.__manager.close()

def cancelled(self):
"""
returns:
bool: ``True`` if the subscription has been cancelled.
"""
return self._cancelled
return self.__cancelled
8 changes: 8 additions & 0 deletions tests/unit/pubsub_v1/publisher/test_futures_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@


class TestFuture(object):
def test_cancel(self):
future = futures.Future()
assert future.cancel() is False

def test_cancelled(self):
future = futures.Future()
assert future.cancelled() is False

def test_result_on_success(self):
future = futures.Future()
future.set_result("570307942214048")
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ def make_future(self):

def test_default_state(self):
future = self.make_future()
manager = future._StreamingPullFuture__manager

assert future.running()
assert not future.done()
assert not future.cancelled()
future._manager.add_close_callback.assert_called_once_with(
future._on_close_callback
)
manager.add_close_callback.assert_called_once_with(future._on_close_callback)

def test__on_close_callback_success(self):
future = self.make_future()
Expand Down Expand Up @@ -71,8 +70,9 @@ def test__on_close_callback_future_already_done(self):

def test_cancel(self):
future = self.make_future()
manager = future._StreamingPullFuture__manager

future.cancel()

future._manager.close.assert_called_once()
manager.close.assert_called_once()
assert future.cancelled()
12 changes: 7 additions & 5 deletions tests/unit/pubsub_v1/subscriber/test_subscriber_client.py
Loading