fix(experimental): implement requests_done method to signal end of requests in async streams. Gracefully close streams. by chandra-siri · Pull Request #1700 · googleapis/python-storage · GitHub
Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

fix(experimental): implement requests_done method to signal end of requests in async streams. Gracefully close streams.#1700

Merged
chandra-siri merged 2 commits into
mainfrom
gracefully_close_streams
Jan 15, 2026
Merged

fix(experimental): implement requests_done method to signal end of requests in async streams. Gracefully close streams.#1700
chandra-siri merged 2 commits into
mainfrom
gracefully_close_streams

Conversation

@chandra-siri

Copy link
Copy Markdown
Collaborator

fix(experimental): implement requests_done method to signal end of requests in async streams. Gracefully close streams.

…quests in async streams. Gracefully close streams.
@chandra-siri chandra-siri requested review from a team January 12, 2026 14:47
@chandra-siri chandra-siri requested a review from a team as a code owner January 12, 2026 14:47
@chandra-siri chandra-siri enabled auto-merge (squash) January 12, 2026 14:47
@product-auto-label product-auto-label Bot added size: m Pull request size is medium. api: storage Issues related to the googleapis/python-storage API. labels Jan 12, 2026
@gemini-code-assist

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a requests_done method to gracefully close asynchronous streams for reading and writing objects. The implementation correctly signals the end of requests to the server before closing the connection. My review focuses on improving code quality and test coverage. I've suggested adding guard clauses to the new methods for robustness and refactoring duplicated code into a base class to improve maintainability. I've also recommended adding tests for the failure scenarios of the new methods.

Comment on lines +158 to +162
async def requests_done(self):
"""Signals that all requests have been sent."""

await self.socket_like_rpc.send(None)
await self.socket_like_rpc.recv()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This new method has a couple of areas for improvement:

  1. Missing guard clause: For consistency with other methods like send and recv, a check should be added to ensure the stream is open before proceeding. This prevents errors if the method is called on a closed stream.
  2. Code duplication: This method is identical to the one in _AsyncWriteObjectStream. To improve maintainability, consider moving this shared logic to the _AsyncAbstractObjectStream base class. This would likely involve moving _is_stream_open and socket_like_rpc to the base class as well.

Here is a suggested implementation that includes the guard clause:

    async def requests_done(self):
        """Signals that all requests have been sent."""
        if not self._is_stream_open:
            raise ValueError("Stream is not open")

        await self.socket_like_rpc.send(None)
        await self.socket_like_rpc.recv()

Comment on lines +159 to +163
async def requests_done(self):
"""Signals that all requests have been sent."""

await self.socket_like_rpc.send(None)
await self.socket_like_rpc.recv()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to my comment on async_read_object_stream.py, this method should include a check to ensure the stream is open.

Additionally, this logic is duplicated across both stream classes. Refactoring this into the _AsyncAbstractObjectStream base class would be a good improvement for maintainability.

    async def requests_done(self):
        """Signals that all requests have been sent."""
        if not self._is_stream_open:
            raise ValueError("Stream is not open")

        await self.socket_like_rpc.send(None)
        await self.socket_like_rpc.recv()

"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_requests_done(mock_client, mock_cls_async_bidi_rpc):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This is a good test for the happy path. With the recommended addition of the _is_stream_open check in requests_done, it would be beneficial to also add a test case to verify that a ValueError is raised when requests_done is called on a closed stream, similar to test_close_without_open_should_raise_error.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the read stream test, this is a good happy path test. It would be valuable to add another test to ensure requests_done raises a ValueError when the stream is not open, to cover the failure case. This would be analogous to test_close_without_open_should_raise_error.

@chandra-siri chandra-siri merged commit 6c16079 into main Jan 15, 2026
18 checks passed
@chandra-siri chandra-siri deleted the gracefully_close_streams branch January 15, 2026 19:48
chandra-siri added a commit that referenced this pull request Feb 2, 2026
PR created by the Librarian CLI to initialize a release. Merging this PR
will auto trigger a release.

Librarian Version: v1.0.2-0.20251119154421-36c3e21ad3ac
Language Image:
us-central1-docker.pkg.dev/cloud-sdk-librarian-prod/images-prod/python-librarian-generator@sha256:8e2c32496077054105bd06c54a59d6a6694287bc053588e24debe6da6920ad91
<details><summary>google-cloud-storage: 3.9.0</summary>

##
[3.9.0](v3.8.0...v3.9.0)
(2026-02-02)

### Features

* update generation for MRD (#1730)
([08bc708](08bc7082))

* add get_object method for async grpc client (#1735)
([0e5ec29](0e5ec29b))

* Add micro-benchmarks for reads comparing standard (regional) vs rapid
(zonal) buckets. (#1697)
([1917649](1917649f))

* Add support for opening via `write_handle` and fix `write_handle` type
(#1715)
([2bc15fa](2bc15fa5))

* add samples for appendable objects writes and reads
([2e1a1eb](2e1a1eb5))

* add samples for appendable objects writes and reads (#1705)
([2e1a1eb](2e1a1eb5))

* add context manager to mrd (#1724)
([5ac2808](5ac2808a))

* Move Zonal Buckets features of `_experimental` (#1728)
([74c9ecc](74c9ecc5))

* add default user agent for grpc (#1726)
([7b31946](7b319469))

* expose finalized_time in blob.py applicable for GET_OBJECT in ZB
(#1719)
([8e21a7f](8e21a7fe))

* expose `DELETE_OBJECT` in `AsyncGrpcClient` (#1718)
([c8dd7a0](c8dd7a0b))

* send `user_agent` to grpc channel (#1712)
([cdb2486](cdb2486b))

* integrate writes strategy and appendable object writer (#1695)
([dbd162b](dbd162b3))

* Add micro-benchmarks for writes comparing standard (regional) vs rapid
(zonal) buckets. (#1707)
([dbe9d8b](dbe9d8b8))

* add support for `generation=0` to avoid overwriting existing objects
and add `is_stream_open` support (#1709)
([ea0f5bf](ea0f5bf8))

* add support for `generation=0` to prevent overwriting existing objects
([ea0f5bf](ea0f5bf8))

* add `is_stream_open` property to AsyncAppendableObjectWriter for
stream status check
([ea0f5bf](ea0f5bf8))

### Bug Fixes

* receive eof while closing reads stream (#1733)
([2ef6339](2ef63396))

* update write handle on every recv() (#1716)
([5d9fafe](5d9fafe1))

* implement requests_done method to signal end of requests in async
streams. Gracefully close streams. (#1700)
([6c16079](6c160794))

* implement requests_done method to signal end of requests in async
streams. Gracefully close streams.
([6c16079](6c160794))

* instance grpc client once per process in benchmarks (#1725)
([721ea2d](721ea2dd))

* Fix formatting in setup.py dependencies list (#1713)
([cc4831d](cc4831d7))

* Change contructors of MRD and AAOW AsyncGrpcClient.grpc_client to
AsyncGrpcClient (#1727)
([e730bf5](e730bf50))

</details>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

api: storage Issues related to the googleapis/python-storage API. size: m Pull request size is medium.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants