Skip to content

Commit 00ccb7a

Browse files
authored
feat(spanner): add support for txn changstream exclusion (#1152)
* feat(spanner): add support for txn changstream exclusion * feat(spanner): add tests for txn change streams exclusion * chore(spanner): lint fix * feat(spanner): add docs * feat(spanner): add test for ILB with change stream exclusion * feat(spanner): update default value and add optional
1 parent c670ebc commit 00ccb7a

File tree

8 files changed

+346
-17
lines changed

8 files changed

+346
-17
lines changed

google/cloud/spanner_v1/batch.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,11 @@ def _check_state(self):
147147
raise ValueError("Batch already committed")
148148

149149
def commit(
150-
self, return_commit_stats=False, request_options=None, max_commit_delay=None
150+
self,
151+
return_commit_stats=False,
152+
request_options=None,
153+
max_commit_delay=None,
154+
exclude_txn_from_change_streams=False,
151155
):
152156
"""Commit mutations to the database.
153157
@@ -178,7 +182,10 @@ def commit(
178182
metadata.append(
179183
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
180184
)
181-
txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite())
185+
txn_options = TransactionOptions(
186+
read_write=TransactionOptions.ReadWrite(),
187+
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
188+
)
182189
trace_attributes = {"num_mutations": len(self._mutations)}
183190

184191
if request_options is None:
@@ -270,7 +277,7 @@ def group(self):
270277
self._mutation_groups.append(mutation_group)
271278
return MutationGroup(self._session, mutation_group.mutations)
272279

273-
def batch_write(self, request_options=None):
280+
def batch_write(self, request_options=None, exclude_txn_from_change_streams=False):
274281
"""Executes batch_write.
275282
276283
:type request_options:
@@ -280,6 +287,13 @@ def batch_write(self, request_options=None):
280287
If a dict is provided, it must be of the same form as the protobuf
281288
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
282289
290+
:type exclude_txn_from_change_streams: bool
291+
:param exclude_txn_from_change_streams:
292+
(Optional) If true, instructs the transaction to be excluded from being recorded in change streams
293+
with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
294+
being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
295+
unset.
296+
283297
:rtype: :class:`Iterable[google.cloud.spanner_v1.types.BatchWriteResponse]`
284298
:returns: a sequence of responses for each batch.
285299
"""
@@ -302,6 +316,7 @@ def batch_write(self, request_options=None):
302316
session=self._session.name,
303317
mutation_groups=self._mutation_groups,
304318
request_options=request_options,
319+
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
305320
)
306321
with trace_call("CloudSpanner.BatchWrite", self._session, trace_attributes):
307322
method = functools.partial(

google/cloud/spanner_v1/database.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ def execute_partitioned_dml(
619619
param_types=None,
620620
query_options=None,
621621
request_options=None,
622+
exclude_txn_from_change_streams=False,
622623
):
623624
"""Execute a partitionable DML statement.
624625
@@ -651,6 +652,13 @@ def execute_partitioned_dml(
651652
Please note, the `transactionTag` setting will be ignored as it is
652653
not supported for partitioned DML.
653654
655+
:type exclude_txn_from_change_streams: bool
656+
:param exclude_txn_from_change_streams:
657+
(Optional) If true, instructs the transaction to be excluded from being recorded in change streams
658+
with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
659+
being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
660+
unset.
661+
654662
:rtype: int
655663
:returns: Count of rows affected by the DML statement.
656664
"""
@@ -673,7 +681,8 @@ def execute_partitioned_dml(
673681
api = self.spanner_api
674682

675683
txn_options = TransactionOptions(
676-
partitioned_dml=TransactionOptions.PartitionedDml()
684+
partitioned_dml=TransactionOptions.PartitionedDml(),
685+
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
677686
)
678687

679688
metadata = _metadata_with_prefix(self.name)
@@ -752,7 +761,12 @@ def snapshot(self, **kw):
752761
"""
753762
return SnapshotCheckout(self, **kw)
754763

755-
def batch(self, request_options=None, max_commit_delay=None):
764+
def batch(
765+
self,
766+
request_options=None,
767+
max_commit_delay=None,
768+
exclude_txn_from_change_streams=False,
769+
):
756770
"""Return an object which wraps a batch.
757771
758772
The wrapper *must* be used as a context manager, with the batch
@@ -771,10 +785,19 @@ def batch(self, request_options=None, max_commit_delay=None):
771785
in order to improve throughput. Value must be between 0ms and
772786
500ms.
773787
788+
:type exclude_txn_from_change_streams: bool
789+
:param exclude_txn_from_change_streams:
790+
(Optional) If true, instructs the transaction to be excluded from being recorded in change streams
791+
with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
792+
being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
793+
unset.
794+
774795
:rtype: :class:`~google.cloud.spanner_v1.database.BatchCheckout`
775796
:returns: new wrapper
776797
"""
777-
return BatchCheckout(self, request_options, max_commit_delay)
798+
return BatchCheckout(
799+
self, request_options, max_commit_delay, exclude_txn_from_change_streams
800+
)
778801

779802
def mutation_groups(self):
780803
"""Return an object which wraps a mutation_group.
@@ -840,6 +863,10 @@ def run_in_transaction(self, func, *args, **kw):
840863
"max_commit_delay" will be removed and used to set the
841864
max_commit_delay for the request. Value must be between
842865
0ms and 500ms.
866+
"exclude_txn_from_change_streams" if true, instructs the transaction to be excluded
867+
from being recorded in change streams with the DDL option `allow_txn_exclusion=true`.
868+
This does not exclude the transaction from being recorded in the change streams with
869+
the DDL option `allow_txn_exclusion` being false or unset.
843870
844871
:rtype: Any
845872
:returns: The return value of ``func``.
@@ -1103,7 +1130,13 @@ class BatchCheckout(object):
11031130
in order to improve throughput.
11041131
"""
11051132

1106-
def __init__(self, database, request_options=None, max_commit_delay=None):
1133+
def __init__(
1134+
self,
1135+
database,
1136+
request_options=None,
1137+
max_commit_delay=None,
1138+
exclude_txn_from_change_streams=False,
1139+
):
11071140
self._database = database
11081141
self._session = self._batch = None
11091142
if request_options is None:
@@ -1113,6 +1146,7 @@ def __init__(self, database, request_options=None, max_commit_delay=None):
11131146
else:
11141147
self._request_options = request_options
11151148
self._max_commit_delay = max_commit_delay
1149+
self._exclude_txn_from_change_streams = exclude_txn_from_change_streams
11161150

11171151
def __enter__(self):
11181152
"""Begin ``with`` block."""
@@ -1130,6 +1164,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
11301164
return_commit_stats=self._database.log_commit_stats,
11311165
request_options=self._request_options,
11321166
max_commit_delay=self._max_commit_delay,
1167+
exclude_txn_from_change_streams=self._exclude_txn_from_change_streams,
11331168
)
11341169
finally:
11351170
if self._database.log_commit_stats and self._batch.commit_stats:

google/cloud/spanner_v1/session.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,10 @@ def run_in_transaction(self, func, *args, **kw):
387387
request options for the commit request.
388388
"max_commit_delay" will be removed and used to set the max commit delay for the request.
389389
"transaction_tag" will be removed and used to set the transaction tag for the request.
390+
"exclude_txn_from_change_streams" if true, instructs the transaction to be excluded
391+
from being recorded in change streams with the DDL option `allow_txn_exclusion=true`.
392+
This does not exclude the transaction from being recorded in the change streams with
393+
the DDL option `allow_txn_exclusion` being false or unset.
390394
391395
:rtype: Any
392396
:returns: The return value of ``func``.
@@ -398,12 +402,16 @@ def run_in_transaction(self, func, *args, **kw):
398402
commit_request_options = kw.pop("commit_request_options", None)
399403
max_commit_delay = kw.pop("max_commit_delay", None)
400404
transaction_tag = kw.pop("transaction_tag", None)
405+
exclude_txn_from_change_streams = kw.pop(
406+
"exclude_txn_from_change_streams", None
407+
)
401408
attempts = 0
402409

403410
while True:
404411
if self._transaction is None:
405412
txn = self.transaction()
406413
txn.transaction_tag = transaction_tag
414+
txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams
407415
else:
408416
txn = self._transaction
409417

google/cloud/spanner_v1/transaction.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class Transaction(_SnapshotBase, _BatchBase):
5555
_execute_sql_count = 0
5656
_lock = threading.Lock()
5757
_read_only = False
58+
exclude_txn_from_change_streams = False
5859

5960
def __init__(self, session):
6061
if session._transaction is not None:
@@ -86,7 +87,10 @@ def _make_txn_selector(self):
8687

8788
if self._transaction_id is None:
8889
return TransactionSelector(
89-
begin=TransactionOptions(read_write=TransactionOptions.ReadWrite())
90+
begin=TransactionOptions(
91+
read_write=TransactionOptions.ReadWrite(),
92+
exclude_txn_from_change_streams=self.exclude_txn_from_change_streams,
93+
)
9094
)
9195
else:
9296
return TransactionSelector(id=self._transaction_id)
@@ -137,7 +141,10 @@ def begin(self):
137141
metadata.append(
138142
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
139143
)
140-
txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite())
144+
txn_options = TransactionOptions(
145+
read_write=TransactionOptions.ReadWrite(),
146+
exclude_txn_from_change_streams=self.exclude_txn_from_change_streams,
147+
)
141148
with trace_call("CloudSpanner.BeginTransaction", self._session):
142149
method = functools.partial(
143150
api.begin_transaction,

tests/unit/test_batch.py

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,12 @@ def test_commit_ok(self):
259259
"CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1)
260260
)
261261

262-
def _test_commit_with_options(self, request_options=None, max_commit_delay_in=None):
262+
def _test_commit_with_options(
263+
self,
264+
request_options=None,
265+
max_commit_delay_in=None,
266+
exclude_txn_from_change_streams=False,
267+
):
263268
import datetime
264269
from google.cloud.spanner_v1 import CommitResponse
265270
from google.cloud.spanner_v1 import TransactionOptions
@@ -276,7 +281,9 @@ def _test_commit_with_options(self, request_options=None, max_commit_delay_in=No
276281
batch.transaction_tag = self.TRANSACTION_TAG
277282
batch.insert(TABLE_NAME, COLUMNS, VALUES)
278283
committed = batch.commit(
279-
request_options=request_options, max_commit_delay=max_commit_delay_in
284+
request_options=request_options,
285+
max_commit_delay=max_commit_delay_in,
286+
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
280287
)
281288

282289
self.assertEqual(committed, now)
@@ -301,6 +308,10 @@ def _test_commit_with_options(self, request_options=None, max_commit_delay_in=No
301308
self.assertEqual(mutations, batch._mutations)
302309
self.assertIsInstance(single_use_txn, TransactionOptions)
303310
self.assertTrue(type(single_use_txn).pb(single_use_txn).HasField("read_write"))
311+
self.assertEqual(
312+
single_use_txn.exclude_txn_from_change_streams,
313+
exclude_txn_from_change_streams,
314+
)
304315
self.assertEqual(
305316
metadata,
306317
[
@@ -355,6 +366,14 @@ def test_commit_w_max_commit_delay(self):
355366
max_commit_delay_in=datetime.timedelta(milliseconds=100),
356367
)
357368

369+
def test_commit_w_exclude_txn_from_change_streams(self):
370+
request_options = RequestOptions(
371+
request_tag="tag-1",
372+
)
373+
self._test_commit_with_options(
374+
request_options=request_options, exclude_txn_from_change_streams=True
375+
)
376+
358377
def test_context_mgr_already_committed(self):
359378
import datetime
360379
from google.cloud._helpers import UTC
@@ -499,7 +518,9 @@ def test_batch_write_grpc_error(self):
499518
attributes=dict(BASE_ATTRIBUTES, num_mutation_groups=1),
500519
)
501520

502-
def _test_batch_write_with_request_options(self, request_options=None):
521+
def _test_batch_write_with_request_options(
522+
self, request_options=None, exclude_txn_from_change_streams=False
523+
):
503524
import datetime
504525
from google.cloud.spanner_v1 import BatchWriteResponse
505526
from google.cloud._helpers import UTC
@@ -519,7 +540,10 @@ def _test_batch_write_with_request_options(self, request_options=None):
519540
group = groups.group()
520541
group.insert(TABLE_NAME, COLUMNS, VALUES)
521542

522-
response_iter = groups.batch_write(request_options)
543+
response_iter = groups.batch_write(
544+
request_options,
545+
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
546+
)
523547
self.assertEqual(len(response_iter), 1)
524548
self.assertEqual(response_iter[0], response)
525549

@@ -528,6 +552,7 @@ def _test_batch_write_with_request_options(self, request_options=None):
528552
mutation_groups,
529553
actual_request_options,
530554
metadata,
555+
request_exclude_txn_from_change_streams,
531556
) = api._batch_request
532557
self.assertEqual(session, self.SESSION_NAME)
533558
self.assertEqual(mutation_groups, groups._mutation_groups)
@@ -545,6 +570,9 @@ def _test_batch_write_with_request_options(self, request_options=None):
545570
else:
546571
expected_request_options = request_options
547572
self.assertEqual(actual_request_options, expected_request_options)
573+
self.assertEqual(
574+
request_exclude_txn_from_change_streams, exclude_txn_from_change_streams
575+
)
548576

549577
self.assertSpanAttributes(
550578
"CloudSpanner.BatchWrite",
@@ -567,6 +595,11 @@ def test_batch_write_w_incorrect_tag_dictionary_error(self):
567595
with self.assertRaises(ValueError):
568596
self._test_batch_write_with_request_options({"incorrect_tag": "tag-1-1"})
569597

598+
def test_batch_write_w_exclude_txn_from_change_streams(self):
599+
self._test_batch_write_with_request_options(
600+
exclude_txn_from_change_streams=True
601+
)
602+
570603

571604
class _Session(object):
572605
def __init__(self, database=None, name=TestBatch.SESSION_NAME):
@@ -625,6 +658,7 @@ def batch_write(
625658
request.mutation_groups,
626659
request.request_options,
627660
metadata,
661+
request.exclude_txn_from_change_streams,
628662
)
629663
if self._rpc_error:
630664
raise Unknown("error")

tests/unit/test_database.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,7 @@ def _execute_partitioned_dml_helper(
10831083
query_options=None,
10841084
request_options=None,
10851085
retried=False,
1086+
exclude_txn_from_change_streams=False,
10861087
):
10871088
from google.api_core.exceptions import Aborted
10881089
from google.api_core.retry import Retry
@@ -1129,13 +1130,19 @@ def _execute_partitioned_dml_helper(
11291130
api.execute_streaming_sql.return_value = iterator
11301131

11311132
row_count = database.execute_partitioned_dml(
1132-
dml, params, param_types, query_options, request_options
1133+
dml,
1134+
params,
1135+
param_types,
1136+
query_options,
1137+
request_options,
1138+
exclude_txn_from_change_streams,
11331139
)
11341140

11351141
self.assertEqual(row_count, 2)
11361142

11371143
txn_options = TransactionOptions(
1138-
partitioned_dml=TransactionOptions.PartitionedDml()
1144+
partitioned_dml=TransactionOptions.PartitionedDml(),
1145+
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
11391146
)
11401147

11411148
api.begin_transaction.assert_called_with(
@@ -1250,6 +1257,11 @@ def test_execute_partitioned_dml_w_req_tag_used(self):
12501257
def test_execute_partitioned_dml_wo_params_retry_aborted(self):
12511258
self._execute_partitioned_dml_helper(dml=DML_WO_PARAM, retried=True)
12521259

1260+
def test_execute_partitioned_dml_w_exclude_txn_from_change_streams(self):
1261+
self._execute_partitioned_dml_helper(
1262+
dml=DML_WO_PARAM, exclude_txn_from_change_streams=True
1263+
)
1264+
12531265
def test_session_factory_defaults(self):
12541266
from google.cloud.spanner_v1.session import Session
12551267

0 commit comments

Comments
 (0)