feat: Add support for multiplexed sessions - read/write by currantw · Pull Request #1389 · googleapis/python-spanner · GitHub
Skip to content
This repository was archived by the owner on Jun 8, 2026. It is now read-only.
12 changes: 3 additions & 9 deletions google/cloud/spanner_v1/database_sessions_manager.py
84 changes: 37 additions & 47 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ def __init__(self, database, labels=None, database_role=None, is_multiplexed=Fal
self._database = database
self._session_id: Optional[str] = None

# TODO multiplexed - remove
self._transaction: Optional[Transaction] = None

if labels is None:
labels = {}

Expand Down Expand Up @@ -467,23 +464,18 @@ def batch(self):

return Batch(self)

def transaction(self):
def transaction(self) -> Transaction:
"""Create a transaction to perform a set of reads with shared staleness.

:rtype: :class:`~google.cloud.spanner_v1.transaction.Transaction`
:returns: a transaction bound to this session

:raises ValueError: if the session has not yet been created.
"""
if self._session_id is None:
raise ValueError("Session has not been created.")

# TODO multiplexed - remove
if self._transaction is not None:
self._transaction.rolled_back = True
self._transaction = None

txn = self._transaction = Transaction(self)
return txn
return Transaction(self)

def run_in_transaction(self, func, *args, **kw):
"""Perform a unit of work in a transaction, retrying on abort.
Expand Down Expand Up @@ -528,42 +520,43 @@ def run_in_transaction(self, func, *args, **kw):
)
isolation_level = kw.pop("isolation_level", None)

attempts = 0
database = self._database
log_commit_stats = database.log_commit_stats

observability_options = getattr(self._database, "observability_options", None)
with trace_call(
"CloudSpanner.Session.run_in_transaction",
self,
observability_options=observability_options,
observability_options=getattr(database, "observability_options", None),
) as span, MetricsCapture():
attempts: int = 0

# If a transaction using a multiplexed session is retried after an aborted
# user operation, it should include the previous transaction ID in the
# transaction options used to begin the transaction. This allows the backend
# to recognize the transaction and increase the lock order for the new
# transaction that is created.
# See :attr:`~google.cloud.spanner_v1.types.TransactionOptions.ReadWrite.multiplexed_session_previous_transaction_id`
previous_transaction_id: Optional[bytes] = None

while True:
# TODO multiplexed - remove
if self._transaction is None:
txn = self.transaction()
txn.transaction_tag = transaction_tag
txn.exclude_txn_from_change_streams = (
exclude_txn_from_change_streams
txn = self.transaction()
txn.transaction_tag = transaction_tag
txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams
txn.isolation_level = isolation_level

if self.is_multiplexed:
txn._multiplexed_session_previous_transaction_id = (
previous_transaction_id
)
txn.isolation_level = isolation_level
else:
txn = self._transaction

span_attributes = dict()
attempts += 1
span_attributes = dict(attempt=attempts)

try:
attempts += 1
span_attributes["attempt"] = attempts
txn_id = getattr(txn, "_transaction_id", "") or ""
if txn_id:
span_attributes["transaction.id"] = txn_id

return_value = func(txn, *args, **kw)

# TODO multiplexed: store previous transaction ID.
except Aborted as exc:
# TODO multiplexed - remove
self._transaction = None

previous_transaction_id = txn._transaction_id
if span:
delay_seconds = _get_retry_delay(
exc.errors[0],
Expand All @@ -582,16 +575,15 @@ def run_in_transaction(self, func, *args, **kw):
exc, deadline, attempts, default_retry_delay=default_retry_delay
)
continue
except GoogleAPICallError:
# TODO multiplexed - remove
self._transaction = None

except GoogleAPICallError:
add_span_event(
span,
"User operation failed due to GoogleAPICallError, not retrying",
span_attributes,
)
raise

except Exception:
add_span_event(
span,
Expand All @@ -603,14 +595,13 @@ def run_in_transaction(self, func, *args, **kw):

try:
txn.commit(
return_commit_stats=self._database.log_commit_stats,
return_commit_stats=log_commit_stats,
request_options=commit_request_options,
max_commit_delay=max_commit_delay,
)
except Aborted as exc:
# TODO multiplexed - remove
self._transaction = None

except Aborted as exc:
previous_transaction_id = txn._transaction_id
if span:
delay_seconds = _get_retry_delay(
exc.errors[0],
Expand All @@ -621,26 +612,25 @@ def run_in_transaction(self, func, *args, **kw):
attributes.update(span_attributes)
add_span_event(
span,
"Transaction got aborted during commit, retrying afresh",
"Transaction was aborted during commit, retrying",
attributes,
)

_delay_until_retry(
exc, deadline, attempts, default_retry_delay=default_retry_delay
)
except GoogleAPICallError:
# TODO multiplexed - remove
self._transaction = None

except GoogleAPICallError:
add_span_event(
span,
"Transaction.commit failed due to GoogleAPICallError, not retrying",
span_attributes,
)
raise

else:
if self._database.log_commit_stats and txn.commit_stats:
self._database.logger.info(
if log_commit_stats and txn.commit_stats:
database.logger.info(
"CommitStats: {}".format(txn.commit_stats),
extra={"commit_stats": txn.commit_stats},
)
Expand Down
91 changes: 49 additions & 42 deletions google/cloud/spanner_v1/snapshot.py
Loading
Loading