Skip to content

Commit b0dd892

Browse files
feat: add timeout paramter to load_table_from_file and it dependent methods (#327)
1 parent 29dd573 commit b0dd892

File tree

2 files changed

+92
-24
lines changed

2 files changed

+92
-24
lines changed

google/cloud/bigquery/client.py

+59-12
Original file line numberDiff line numberDiff line change
@@ -1591,14 +1591,17 @@ def job_from_resource(self, resource):
15911591
return job.QueryJob.from_api_repr(resource, self)
15921592
return job.UnknownJob.from_api_repr(resource, self)
15931593

1594-
def create_job(self, job_config, retry=DEFAULT_RETRY):
1594+
def create_job(self, job_config, retry=DEFAULT_RETRY, timeout=None):
15951595
"""Create a new job.
15961596
Args:
15971597
job_config (dict): configuration job representation returned from the API.
15981598
15991599
Keyword Arguments:
16001600
retry (Optional[google.api_core.retry.Retry]):
16011601
How to retry the RPC.
1602+
timeout (Optional[float]):
1603+
The number of seconds to wait for the underlying HTTP transport
1604+
before using ``retry``.
16021605
16031606
Returns:
16041607
Union[ \
@@ -1617,7 +1620,11 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
16171620
destination = _get_sub_prop(job_config, ["load", "destinationTable"])
16181621
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
16191622
return self.load_table_from_uri(
1620-
source_uris, destination, job_config=load_job_config, retry=retry
1623+
source_uris,
1624+
destination,
1625+
job_config=load_job_config,
1626+
retry=retry,
1627+
timeout=timeout,
16211628
)
16221629
elif "copy" in job_config:
16231630
copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
@@ -1633,7 +1640,11 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
16331640
table_ref = TableReference.from_api_repr(source_config)
16341641
sources.append(table_ref)
16351642
return self.copy_table(
1636-
sources, destination, job_config=copy_job_config, retry=retry
1643+
sources,
1644+
destination,
1645+
job_config=copy_job_config,
1646+
retry=retry,
1647+
timeout=timeout,
16371648
)
16381649
elif "extract" in job_config:
16391650
extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
@@ -1650,6 +1661,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
16501661
destination_uris,
16511662
job_config=extract_job_config,
16521663
retry=retry,
1664+
timeout=timeout,
16531665
source_type=source_type,
16541666
)
16551667
elif "query" in job_config:
@@ -1659,7 +1671,9 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
16591671
copy_config
16601672
)
16611673
query = _get_sub_prop(copy_config, ["query", "query"])
1662-
return self.query(query, job_config=query_job_config, retry=retry)
1674+
return self.query(
1675+
query, job_config=query_job_config, retry=retry, timeout=timeout
1676+
)
16631677
else:
16641678
raise TypeError("Invalid job configuration received.")
16651679

@@ -1981,6 +1995,7 @@ def load_table_from_file(
19811995
location=None,
19821996
project=None,
19831997
job_config=None,
1998+
timeout=None,
19841999
):
19852000
"""Upload the contents of this table from a file-like object.
19862001
@@ -2020,6 +2035,9 @@ def load_table_from_file(
20202035
to the client's project.
20212036
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
20222037
Extra configuration options for the job.
2038+
timeout (Optional[float]):
2039+
The number of seconds to wait for the underlying HTTP transport
2040+
before using ``retry``.
20232041
20242042
Returns:
20252043
google.cloud.bigquery.job.LoadJob: A new load job.
@@ -2058,11 +2076,11 @@ def load_table_from_file(
20582076
try:
20592077
if size is None or size >= _MAX_MULTIPART_SIZE:
20602078
response = self._do_resumable_upload(
2061-
file_obj, job_resource, num_retries
2079+
file_obj, job_resource, num_retries, timeout
20622080
)
20632081
else:
20642082
response = self._do_multipart_upload(
2065-
file_obj, job_resource, size, num_retries
2083+
file_obj, job_resource, size, num_retries, timeout
20662084
)
20672085
except resumable_media.InvalidResponse as exc:
20682086
raise exceptions.from_http_response(exc.response)
@@ -2080,6 +2098,7 @@ def load_table_from_dataframe(
20802098
project=None,
20812099
job_config=None,
20822100
parquet_compression="snappy",
2101+
timeout=None,
20832102
):
20842103
"""Upload the contents of a table from a pandas DataFrame.
20852104
@@ -2143,6 +2162,9 @@ def load_table_from_dataframe(
21432162
passed as the ``compression`` argument to the underlying
21442163
``DataFrame.to_parquet()`` method.
21452164
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
2165+
timeout (Optional[float]):
2166+
The number of seconds to wait for the underlying HTTP transport
2167+
before using ``retry``.
21462168
21472169
Returns:
21482170
google.cloud.bigquery.job.LoadJob: A new load job.
@@ -2249,6 +2271,7 @@ def load_table_from_dataframe(
22492271
location=location,
22502272
project=project,
22512273
job_config=job_config,
2274+
timeout=timeout,
22522275
)
22532276

22542277
finally:
@@ -2264,6 +2287,7 @@ def load_table_from_json(
22642287
location=None,
22652288
project=None,
22662289
job_config=None,
2290+
timeout=None,
22672291
):
22682292
"""Upload the contents of a table from a JSON string or dict.
22692293
@@ -2313,6 +2337,9 @@ def load_table_from_json(
23132337
Extra configuration options for the job. The ``source_format``
23142338
setting is always set to
23152339
:attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
2340+
timeout (Optional[float]):
2341+
The number of seconds to wait for the underlying HTTP transport
2342+
before using ``retry``.
23162343
23172344
Returns:
23182345
google.cloud.bigquery.job.LoadJob: A new load job.
@@ -2357,9 +2384,10 @@ def load_table_from_json(
23572384
location=location,
23582385
project=project,
23592386
job_config=job_config,
2387+
timeout=timeout,
23602388
)
23612389

2362-
def _do_resumable_upload(self, stream, metadata, num_retries):
2390+
def _do_resumable_upload(self, stream, metadata, num_retries, timeout):
23632391
"""Perform a resumable upload.
23642392
23652393
Args:
@@ -2371,21 +2399,25 @@ def _do_resumable_upload(self, stream, metadata, num_retries):
23712399
Number of upload retries. (Deprecated: This
23722400
argument will be removed in a future release.)
23732401
2402+
timeout (float):
2403+
The number of seconds to wait for the underlying HTTP transport
2404+
before using ``retry``.
2405+
23742406
Returns:
23752407
requests.Response:
23762408
The "200 OK" response object returned after the final chunk
23772409
is uploaded.
23782410
"""
23792411
upload, transport = self._initiate_resumable_upload(
2380-
stream, metadata, num_retries
2412+
stream, metadata, num_retries, timeout
23812413
)
23822414

23832415
while not upload.finished:
23842416
response = upload.transmit_next_chunk(transport)
23852417

23862418
return response
23872419

2388-
def _initiate_resumable_upload(self, stream, metadata, num_retries):
2420+
def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout):
23892421
"""Initiate a resumable upload.
23902422
23912423
Args:
@@ -2397,6 +2429,10 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries):
23972429
Number of upload retries. (Deprecated: This
23982430
argument will be removed in a future release.)
23992431
2432+
timeout (float):
2433+
The number of seconds to wait for the underlying HTTP transport
2434+
before using ``retry``.
2435+
24002436
Returns:
24012437
Tuple:
24022438
Pair of
@@ -2419,12 +2455,17 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries):
24192455
)
24202456

24212457
upload.initiate(
2422-
transport, stream, metadata, _GENERIC_CONTENT_TYPE, stream_final=False
2458+
transport,
2459+
stream,
2460+
metadata,
2461+
_GENERIC_CONTENT_TYPE,
2462+
stream_final=False,
2463+
timeout=timeout,
24232464
)
24242465

24252466
return upload, transport
24262467

2427-
def _do_multipart_upload(self, stream, metadata, size, num_retries):
2468+
def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout):
24282469
"""Perform a multipart upload.
24292470
24302471
Args:
@@ -2441,6 +2482,10 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries):
24412482
Number of upload retries. (Deprecated: This
24422483
argument will be removed in a future release.)
24432484
2485+
timeout (float):
2486+
The number of seconds to wait for the underlying HTTP transport
2487+
before using ``retry``.
2488+
24442489
Returns:
24452490
requests.Response:
24462491
The "200 OK" response object returned after the multipart
@@ -2466,7 +2511,9 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries):
24662511
max_retries=num_retries
24672512
)
24682513

2469-
response = upload.transmit(self._http, data, metadata, _GENERIC_CONTENT_TYPE)
2514+
response = upload.transmit(
2515+
self._http, data, metadata, _GENERIC_CONTENT_TYPE, timeout=timeout
2516+
)
24702517

24712518
return response
24722519

0 commit comments

Comments
 (0)