fix(rab): run async background boundary refresh on detached session (… · googleapis/google-cloud-python@56cbea8 · GitHub
Skip to content

Commit 56cbea8

Browse files
authored
fix(rab): run async background boundary refresh on detached session (#17441)
When AuthorizedSession.request() makes an API call, it runs inside a temporary aiohttp ClientSession block. If our background Regional Access Boundary (RAB) refresh worker naively shares this exact same session, a fast primary call (like an instant 401/403 or a quick CRM check) will exit its block and close the active socket mid-flight. This causes the background worker to silently fail with "RuntimeError: Session is closed" and forces the RAB manager into a 15-minute cooldown. This commit resolves the race condition and ensures safe connection lifecycle management: - Shifted the cloning block to run synchronously inside start_refresh, capturing a fresh, independent ClientSession before the foreground thread can close the source transport. - Added a _clone() method to async Request adapters (both modern and legacy) to copy proxy settings and trace configurations while enforcing connector limits. - Prevented resource leaks on task creation failures by capturing exceptions in start_refresh and closing the cloned session synchronously. - Refactored the close wrapper to inspect and await generic awaitables (such as asyncio.Future) returned by custom or third-party transports. - Aligned exception behaviors by raising a wrapped TransportError directly when calling a closed instance of the legacy aiohttp_requests adapter. - Ensured the cloned transport is cleanly closed in a finally block after the background lookup settles.
1 parent b50cf1a commit 56cbea8

9 files changed

Lines changed: 1025 additions & 4 deletions

File tree

packages/google-auth/google/auth/_regional_access_boundary_utils.py

Lines changed: 85 additions & 3 deletions

packages/google-auth/google/auth/aio/transport/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,3 +142,13 @@ async def close(self) -> None:
142142
Close the underlying session.
143143
"""
144144
raise NotImplementedError("close must be implemented.")
145+
146+
def _clone(self) -> "Request":
147+
"""Creates a copy of this request adapter.
148+
149+
The base implementation returns `self` (an identical shared instance).
150+
Transport adapters that maintain internal connection pools or stateful
151+
sessions must override this method to return an independent, detached
152+
adapter instance.
153+
"""
154+
return self

packages/google-auth/google/auth/aio/transport/aiohttp.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
else:
3737
try:
3838
from aiohttp import ClientTimeout
39-
except (ImportError, AttributeError):
39+
except (ImportError, AttributeError): # pragma: NO COVER
4040
ClientTimeout = None
4141

4242
_LOGGER = logging.getLogger(__name__)
@@ -203,3 +203,83 @@ async def close(self) -> None:
203203
if not self._closed and self._session:
204204
await self._session.close()
205205
self._closed = True
206+
207+
def _clone(self) -> "Request":
208+
"""Creates an independent copy of this request adapter.
209+
210+
Clones the connection settings, trace configurations, and session defaults
211+
(headers, cookies, basic auth, and timeouts).
212+
213+
Only standard `aiohttp.TCPConnector` and `aiohttp.UnixConnector` connectors
214+
are supported. The DNS resolver is not copied to avoid closing shared resolver
215+
resources.
216+
217+
Returns:
218+
google.auth.aio.transport.aiohttp.Request: A new request adapter.
219+
220+
Raises:
221+
google.auth.exceptions.TransportError: If the transport is closed, or if the
222+
session uses an unsupported connector.
223+
"""
224+
if self._closed:
225+
raise exceptions.TransportError("Cannot clone a closed transport.")
226+
227+
if not self._session:
228+
new_session = aiohttp.ClientSession(
229+
auto_decompress=False,
230+
trust_env=True,
231+
)
232+
return Request(session=new_session)
233+
234+
session_kwargs: dict = {
235+
"auto_decompress": False,
236+
"trust_env": getattr(self._session, "_trust_env", True),
237+
}
238+
239+
# Copy underlying connection pool settings (SSL context, IP bindings, limits).
240+
orig_connector = getattr(self._session, "_connector", None)
241+
if orig_connector and not orig_connector.closed:
242+
if isinstance(orig_connector, aiohttp.TCPConnector):
243+
# We explicitly do not copy the resolver. The connector
244+
# owns the resolver, and closing the cloned session would
245+
# close the shared resolver, breaking the original session.
246+
session_kwargs["connector"] = aiohttp.TCPConnector(
247+
ssl=getattr(orig_connector, "_ssl", None), # type: ignore
248+
limit=getattr(orig_connector, "_limit", 100),
249+
limit_per_host=getattr(orig_connector, "_limit_per_host", 0),
250+
force_close=getattr(orig_connector, "_force_close", False),
251+
local_addr=getattr(orig_connector, "_local_addr", None),
252+
)
253+
elif getattr(aiohttp, "UnixConnector", None) and isinstance(
254+
orig_connector, getattr(aiohttp, "UnixConnector")
255+
):
256+
path = getattr(orig_connector, "_path", None)
257+
if path:
258+
session_kwargs["connector"] = aiohttp.UnixConnector(
259+
path=path,
260+
limit=getattr(orig_connector, "_limit", 100),
261+
force_close=getattr(orig_connector, "_force_close", False),
262+
)
263+
else:
264+
raise exceptions.TransportError(
265+
f"Unsupported connector type for cloning: {type(orig_connector)}"
266+
)
267+
268+
# Preserve distributed tracing configurations.
269+
trace_configs = getattr(self._session, "_trace_configs", None)
270+
if trace_configs:
271+
session_kwargs["trace_configs"] = list(trace_configs)
272+
273+
# Copy session-level defaults (headers, cookies, auth, timeout).
274+
for attr_name, kwarg_name in [
275+
("_default_headers", "headers"),
276+
("_cookie_jar", "cookie_jar"),
277+
("_default_auth", "auth"),
278+
("_timeout", "timeout"),
279+
("_json_serialize", "json_serialize"),
280+
]:
281+
val = getattr(self._session, attr_name, None)
282+
if val is not None:
283+
session_kwargs[kwarg_name] = val
284+
285+
return Request(session=aiohttp.ClientSession(**session_kwargs)) # type: ignore

packages/google-auth/google/auth/transport/_aiohttp_requests.py

Lines changed: 90 additions & 0 deletions

0 commit comments

Comments
 (0)