Skip to content

Commit 1a7c9d2

Browse files
authored
feat: added retry and timeout params to partition read in database and snapshot class (#278)
* feat: added retry and timeout params to partition read in database and snapshot class * feat: lint corrections * feat: added retry and timeout support in process_read_batch and process_query_batch * feat: added retry and timeout support in process_read_batch and process_query_batch * feat: changed retry to retry object in tests
1 parent 2fd0352 commit 1a7c9d2

File tree

6 files changed

+375
-23
lines changed

6 files changed

+375
-23
lines changed

google/cloud/spanner_v1/database.py

+46-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from google.api_core.retry import if_exception_type
2727
from google.cloud.exceptions import NotFound
2828
from google.api_core.exceptions import Aborted
29+
from google.api_core import gapic_v1
2930
import six
3031

3132
# pylint: disable=ungrouped-imports
@@ -915,6 +916,9 @@ def generate_read_batches(
915916
index="",
916917
partition_size_bytes=None,
917918
max_partitions=None,
919+
*,
920+
retry=gapic_v1.method.DEFAULT,
921+
timeout=gapic_v1.method.DEFAULT,
918922
):
919923
"""Start a partitioned batch read operation.
920924
@@ -946,6 +950,12 @@ def generate_read_batches(
946950
service uses this as a hint, the actual number of partitions may
947951
differ.
948952
953+
:type retry: :class:`~google.api_core.retry.Retry`
954+
:param retry: (Optional) The retry settings for this request.
955+
956+
:type timeout: float
957+
:param timeout: (Optional) The timeout for this request.
958+
949959
:rtype: iterable of dict
950960
:returns:
951961
mappings of information used perform actual partitioned reads via
@@ -958,6 +968,8 @@ def generate_read_batches(
958968
index=index,
959969
partition_size_bytes=partition_size_bytes,
960970
max_partitions=max_partitions,
971+
retry=retry,
972+
timeout=timeout,
961973
)
962974

963975
read_info = {
@@ -969,21 +981,32 @@ def generate_read_batches(
969981
for partition in partitions:
970982
yield {"partition": partition, "read": read_info.copy()}
971983

972-
def process_read_batch(self, batch):
984+
def process_read_batch(
985+
self, batch, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT,
986+
):
973987
"""Process a single, partitioned read.
974988
975989
:type batch: mapping
976990
:param batch:
977991
one of the mappings returned from an earlier call to
978992
:meth:`generate_read_batches`.
979993
994+
:type retry: :class:`~google.api_core.retry.Retry`
995+
:param retry: (Optional) The retry settings for this request.
996+
997+
:type timeout: float
998+
:param timeout: (Optional) The timeout for this request.
999+
1000+
9801001
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
9811002
:returns: a result set instance which can be used to consume rows.
9821003
"""
9831004
kwargs = copy.deepcopy(batch["read"])
9841005
keyset_dict = kwargs.pop("keyset")
9851006
kwargs["keyset"] = KeySet._from_dict(keyset_dict)
986-
return self._get_snapshot().read(partition=batch["partition"], **kwargs)
1007+
return self._get_snapshot().read(
1008+
partition=batch["partition"], **kwargs, retry=retry, timeout=timeout
1009+
)
9871010

9881011
def generate_query_batches(
9891012
self,
@@ -993,6 +1016,9 @@ def generate_query_batches(
9931016
partition_size_bytes=None,
9941017
max_partitions=None,
9951018
query_options=None,
1019+
*,
1020+
retry=gapic_v1.method.DEFAULT,
1021+
timeout=gapic_v1.method.DEFAULT,
9961022
):
9971023
"""Start a partitioned query operation.
9981024
@@ -1036,6 +1062,12 @@ def generate_query_batches(
10361062
If a dict is provided, it must be of the same form as the protobuf
10371063
message :class:`~google.cloud.spanner_v1.types.QueryOptions`
10381064
1065+
:type retry: :class:`~google.api_core.retry.Retry`
1066+
:param retry: (Optional) The retry settings for this request.
1067+
1068+
:type timeout: float
1069+
:param timeout: (Optional) The timeout for this request.
1070+
10391071
:rtype: iterable of dict
10401072
:returns:
10411073
mappings of information used perform actual partitioned reads via
@@ -1047,6 +1079,8 @@ def generate_query_batches(
10471079
param_types=param_types,
10481080
partition_size_bytes=partition_size_bytes,
10491081
max_partitions=max_partitions,
1082+
retry=retry,
1083+
timeout=timeout,
10501084
)
10511085

10521086
query_info = {"sql": sql}
@@ -1064,19 +1098,27 @@ def generate_query_batches(
10641098
for partition in partitions:
10651099
yield {"partition": partition, "query": query_info}
10661100

1067-
def process_query_batch(self, batch):
1101+
def process_query_batch(
1102+
self, batch, *, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT,
1103+
):
10681104
"""Process a single, partitioned query.
10691105
10701106
:type batch: mapping
10711107
:param batch:
10721108
one of the mappings returned from an earlier call to
10731109
:meth:`generate_query_batches`.
10741110
1111+
:type retry: :class:`~google.api_core.retry.Retry`
1112+
:param retry: (Optional) The retry settings for this request.
1113+
1114+
:type timeout: float
1115+
:param timeout: (Optional) The timeout for this request.
1116+
10751117
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
10761118
:returns: a result set instance which can be used to consume rows.
10771119
"""
10781120
return self._get_snapshot().execute_sql(
1079-
partition=batch["partition"], **batch["query"]
1121+
partition=batch["partition"], **batch["query"], retry=retry, timeout=timeout
10801122
)
10811123

10821124
def process(self, batch):

google/cloud/spanner_v1/session.py

+6
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,12 @@ def execute_sql(
258258
or :class:`dict`
259259
:param query_options: (Optional) Options that are provided for query plan stability.
260260
261+
:type retry: :class:`~google.api_core.retry.Retry`
262+
:param retry: (Optional) The retry settings for this request.
263+
264+
:type timeout: float
265+
:param timeout: (Optional) The timeout for this request.
266+
261267
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
262268
:returns: a result set instance which can be used to consume rows.
263269
"""

google/cloud/spanner_v1/snapshot.py

+56-7
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
from google.api_core.exceptions import InternalServerError
2929
from google.api_core.exceptions import ServiceUnavailable
30-
import google.api_core.gapic_v1.method
30+
from google.api_core import gapic_v1
3131
from google.cloud.spanner_v1._helpers import _make_value_pb
3232
from google.cloud.spanner_v1._helpers import _merge_query_options
3333
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
@@ -109,7 +109,18 @@ def _make_txn_selector(self): # pylint: disable=redundant-returns-doc
109109
"""
110110
raise NotImplementedError
111111

112-
def read(self, table, columns, keyset, index="", limit=0, partition=None):
112+
def read(
113+
self,
114+
table,
115+
columns,
116+
keyset,
117+
index="",
118+
limit=0,
119+
partition=None,
120+
*,
121+
retry=gapic_v1.method.DEFAULT,
122+
timeout=gapic_v1.method.DEFAULT,
123+
):
113124
"""Perform a ``StreamingRead`` API request for rows in a table.
114125
115126
:type table: str
@@ -134,6 +145,12 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
134145
from :meth:`partition_read`. Incompatible with
135146
``limit``.
136147
148+
:type retry: :class:`~google.api_core.retry.Retry`
149+
:param retry: (Optional) The retry settings for this request.
150+
151+
:type timeout: float
152+
:param timeout: (Optional) The timeout for this request.
153+
137154
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
138155
:returns: a result set instance which can be used to consume rows.
139156
@@ -163,7 +180,11 @@ def read(self, table, columns, keyset, index="", limit=0, partition=None):
163180
partition_token=partition,
164181
)
165182
restart = functools.partial(
166-
api.streaming_read, request=request, metadata=metadata,
183+
api.streaming_read,
184+
request=request,
185+
metadata=metadata,
186+
retry=retry,
187+
timeout=timeout,
167188
)
168189

169190
trace_attributes = {"table_id": table, "columns": columns}
@@ -186,8 +207,8 @@ def execute_sql(
186207
query_mode=None,
187208
query_options=None,
188209
partition=None,
189-
retry=google.api_core.gapic_v1.method.DEFAULT,
190-
timeout=google.api_core.gapic_v1.method.DEFAULT,
210+
retry=gapic_v1.method.DEFAULT,
211+
timeout=gapic_v1.method.DEFAULT,
191212
):
192213
"""Perform an ``ExecuteStreamingSql`` API request.
193214
@@ -224,6 +245,12 @@ def execute_sql(
224245
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
225246
:returns: a result set instance which can be used to consume rows.
226247
248+
:type retry: :class:`~google.api_core.retry.Retry`
249+
:param retry: (Optional) The retry settings for this request.
250+
251+
:type timeout: float
252+
:param timeout: (Optional) The timeout for this request.
253+
227254
:raises ValueError:
228255
for reuse of single-use snapshots, or if a transaction ID is
229256
already pending for multiple-use snapshots.
@@ -296,6 +323,9 @@ def partition_read(
296323
index="",
297324
partition_size_bytes=None,
298325
max_partitions=None,
326+
*,
327+
retry=gapic_v1.method.DEFAULT,
328+
timeout=gapic_v1.method.DEFAULT,
299329
):
300330
"""Perform a ``PartitionRead`` API request for rows in a table.
301331
@@ -323,6 +353,12 @@ def partition_read(
323353
service uses this as a hint, the actual number of partitions may
324354
differ.
325355
356+
:type retry: :class:`~google.api_core.retry.Retry`
357+
:param retry: (Optional) The retry settings for this request.
358+
359+
:type timeout: float
360+
:param timeout: (Optional) The timeout for this request.
361+
326362
:rtype: iterable of bytes
327363
:returns: a sequence of partition tokens
328364
@@ -357,7 +393,9 @@ def partition_read(
357393
with trace_call(
358394
"CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes
359395
):
360-
response = api.partition_read(request=request, metadata=metadata,)
396+
response = api.partition_read(
397+
request=request, metadata=metadata, retry=retry, timeout=timeout,
398+
)
361399

362400
return [partition.partition_token for partition in response.partitions]
363401

@@ -368,6 +406,9 @@ def partition_query(
368406
param_types=None,
369407
partition_size_bytes=None,
370408
max_partitions=None,
409+
*,
410+
retry=gapic_v1.method.DEFAULT,
411+
timeout=gapic_v1.method.DEFAULT,
371412
):
372413
"""Perform a ``PartitionQuery`` API request.
373414
@@ -394,6 +435,12 @@ def partition_query(
394435
service uses this as a hint, the actual number of partitions may
395436
differ.
396437
438+
:type retry: :class:`~google.api_core.retry.Retry`
439+
:param retry: (Optional) The retry settings for this request.
440+
441+
:type timeout: float
442+
:param timeout: (Optional) The timeout for this request.
443+
397444
:rtype: iterable of bytes
398445
:returns: a sequence of partition tokens
399446
@@ -438,7 +485,9 @@ def partition_query(
438485
self._session,
439486
trace_attributes,
440487
):
441-
response = api.partition_query(request=request, metadata=metadata,)
488+
response = api.partition_query(
489+
request=request, metadata=metadata, retry=retry, timeout=timeout,
490+
)
442491

443492
return [partition.partition_token for partition in response.partitions]
444493

0 commit comments

Comments
 (0)