Skip to content

Commit 2abdef8

Browse files
feat(bigquery): add create job method (#32)
* feat(bigquery): add create job method * feat(bigquery): Addressed comments and add unit test * feat(bigquery): make copy of job config for query job Co-authored-by: Peter Lamut <[email protected]>
1 parent 6182cf4 commit 2abdef8

File tree

2 files changed

+236
-0
lines changed

2 files changed

+236
-0
lines changed

google/cloud/bigquery/client.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@
5151
from google.cloud import exceptions
5252
from google.cloud.client import ClientWithProject
5353

54+
from google.cloud.bigquery._helpers import _get_sub_prop
5455
from google.cloud.bigquery._helpers import _record_field_to_json
5556
from google.cloud.bigquery._helpers import _str_or_none
5657
from google.cloud.bigquery._helpers import _verify_job_config_type
58+
from google.cloud.bigquery._helpers import _del_sub_prop
5759
from google.cloud.bigquery._http import Connection
5860
from google.cloud.bigquery import _pandas_helpers
5961
from google.cloud.bigquery.dataset import Dataset
@@ -1313,6 +1315,70 @@ def job_from_resource(self, resource):
13131315
return job.QueryJob.from_api_repr(resource, self)
13141316
return job.UnknownJob.from_api_repr(resource, self)
13151317

1318+
def create_job(self, job_config, retry=DEFAULT_RETRY):
1319+
"""Create a new job.
1320+
Arguments:
1321+
job_config (dict): configuration job representation returned from the API.
1322+
1323+
Keyword Arguments:
1324+
retry (google.api_core.retry.Retry):
1325+
(Optional) How to retry the RPC.
1326+
1327+
Returns:
1328+
Union[ \
1329+
google.cloud.bigquery.job.LoadJob, \
1330+
google.cloud.bigquery.job.CopyJob, \
1331+
google.cloud.bigquery.job.ExtractJob, \
1332+
google.cloud.bigquery.job.QueryJob \
1333+
]:
1334+
A new job instance.
1335+
"""
1336+
1337+
if "load" in job_config:
1338+
load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
1339+
job_config
1340+
)
1341+
destination = _get_sub_prop(job_config, ["load", "destinationTable"])
1342+
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
1343+
return self.load_table_from_uri(
1344+
source_uris, destination, job_config=load_job_config, retry=retry
1345+
)
1346+
elif "copy" in job_config:
1347+
copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
1348+
job_config
1349+
)
1350+
destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
1351+
sources = []
1352+
source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"])
1353+
1354+
if source_configs is None:
1355+
source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])]
1356+
for source_config in source_configs:
1357+
table_ref = TableReference.from_api_repr(source_config)
1358+
sources.append(table_ref)
1359+
return self.copy_table(
1360+
sources, destination, job_config=copy_job_config, retry=retry
1361+
)
1362+
elif "extract" in job_config:
1363+
extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
1364+
job_config
1365+
)
1366+
source = _get_sub_prop(job_config, ["extract", "sourceTable"])
1367+
destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
1368+
return self.extract_table(
1369+
source, destination_uris, job_config=extract_job_config, retry=retry
1370+
)
1371+
elif "query" in job_config:
1372+
copy_config = copy.deepcopy(job_config)
1373+
_del_sub_prop(copy_config, ["query", "destinationTable"])
1374+
query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
1375+
copy_config
1376+
)
1377+
query = _get_sub_prop(copy_config, ["query", "query"])
1378+
return self.query(query, job_config=query_job_config, retry=retry)
1379+
else:
1380+
raise TypeError("Invalid job configuration received.")
1381+
13161382
def get_job(
13171383
self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None
13181384
):

tests/unit/test_client.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2795,6 +2795,176 @@ def test_delete_table_w_not_found_ok_true(self):
27952795

27962796
conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None)
27972797

2798+
def _create_job_helper(self, job_config, client_method):
2799+
creds = _make_credentials()
2800+
http = object()
2801+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
2802+
2803+
client._connection = make_connection()
2804+
rf1 = mock.Mock()
2805+
get_config_patch = mock.patch(
2806+
"google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1,
2807+
)
2808+
load_patch = mock.patch(client_method, autospec=True)
2809+
2810+
with load_patch as client_method, get_config_patch:
2811+
client.create_job(job_config=job_config)
2812+
client_method.assert_called_once()
2813+
2814+
def test_create_job_load_config(self):
2815+
configuration = {
2816+
"load": {
2817+
"destinationTable": {
2818+
"projectId": self.PROJECT,
2819+
"datasetId": self.DS_ID,
2820+
"tableId": "source_table",
2821+
},
2822+
"sourceUris": ["gs://test_bucket/src_object*"],
2823+
}
2824+
}
2825+
2826+
self._create_job_helper(
2827+
configuration, "google.cloud.bigquery.client.Client.load_table_from_uri"
2828+
)
2829+
2830+
def test_create_job_copy_config(self):
2831+
configuration = {
2832+
"copy": {
2833+
"sourceTables": [
2834+
{
2835+
"projectId": self.PROJECT,
2836+
"datasetId": self.DS_ID,
2837+
"tableId": "source_table",
2838+
}
2839+
],
2840+
"destinationTable": {
2841+
"projectId": self.PROJECT,
2842+
"datasetId": self.DS_ID,
2843+
"tableId": "destination_table",
2844+
},
2845+
}
2846+
}
2847+
2848+
self._create_job_helper(
2849+
configuration, "google.cloud.bigquery.client.Client.copy_table",
2850+
)
2851+
2852+
def test_create_job_copy_config_w_single_source(self):
2853+
configuration = {
2854+
"copy": {
2855+
"sourceTable": {
2856+
"projectId": self.PROJECT,
2857+
"datasetId": self.DS_ID,
2858+
"tableId": "source_table",
2859+
},
2860+
"destinationTable": {
2861+
"projectId": self.PROJECT,
2862+
"datasetId": self.DS_ID,
2863+
"tableId": "destination_table",
2864+
},
2865+
}
2866+
}
2867+
2868+
self._create_job_helper(
2869+
configuration, "google.cloud.bigquery.client.Client.copy_table",
2870+
)
2871+
2872+
def test_create_job_extract_config(self):
2873+
configuration = {
2874+
"extract": {
2875+
"sourceTable": {
2876+
"projectId": self.PROJECT,
2877+
"datasetId": self.DS_ID,
2878+
"tableId": "source_table",
2879+
},
2880+
"destinationUris": ["gs://test_bucket/dst_object*"],
2881+
}
2882+
}
2883+
self._create_job_helper(
2884+
configuration, "google.cloud.bigquery.client.Client.extract_table",
2885+
)
2886+
2887+
def test_create_job_query_config(self):
2888+
configuration = {
2889+
"query": {"query": "query", "destinationTable": {"tableId": "table_id"}}
2890+
}
2891+
self._create_job_helper(
2892+
configuration, "google.cloud.bigquery.client.Client.query",
2893+
)
2894+
2895+
def test_create_job_query_config_w_rateLimitExceeded_error(self):
2896+
from google.cloud.exceptions import Forbidden
2897+
from google.cloud.bigquery.retry import DEFAULT_RETRY
2898+
2899+
query = "select count(*) from persons"
2900+
configuration = {
2901+
"query": {
2902+
"query": query,
2903+
"useLegacySql": False,
2904+
"destinationTable": {"tableId": "table_id"},
2905+
}
2906+
}
2907+
resource = {
2908+
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
2909+
"configuration": {
2910+
"query": {
2911+
"query": query,
2912+
"useLegacySql": False,
2913+
"destinationTable": {
2914+
"projectId": self.PROJECT,
2915+
"datasetId": self.DS_ID,
2916+
"tableId": "query_destination_table",
2917+
},
2918+
}
2919+
},
2920+
}
2921+
data_without_destination = {
2922+
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
2923+
"configuration": {"query": {"query": query, "useLegacySql": False}},
2924+
}
2925+
2926+
creds = _make_credentials()
2927+
http = object()
2928+
retry = DEFAULT_RETRY.with_deadline(1).with_predicate(
2929+
lambda exc: isinstance(exc, Forbidden)
2930+
)
2931+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
2932+
2933+
api_request_patcher = mock.patch.object(
2934+
client._connection,
2935+
"api_request",
2936+
side_effect=[
2937+
Forbidden("", errors=[{"reason": "rateLimitExceeded"}]),
2938+
resource,
2939+
],
2940+
)
2941+
2942+
with api_request_patcher as fake_api_request:
2943+
job = client.create_job(job_config=configuration, retry=retry)
2944+
2945+
self.assertEqual(job.destination.table_id, "query_destination_table")
2946+
self.assertEqual(len(fake_api_request.call_args_list), 2) # was retried once
2947+
self.assertEqual(
2948+
fake_api_request.call_args_list[1],
2949+
mock.call(
2950+
method="POST",
2951+
path="/projects/PROJECT/jobs",
2952+
data=data_without_destination,
2953+
timeout=None,
2954+
),
2955+
)
2956+
2957+
def test_create_job_w_invalid_job_config(self):
2958+
configuration = {"unknown": {}}
2959+
creds = _make_credentials()
2960+
http = object()
2961+
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
2962+
2963+
with self.assertRaises(TypeError) as exc:
2964+
client.create_job(job_config=configuration)
2965+
2966+
self.assertIn("Invalid job configuration", exc.exception.args[0])
2967+
27982968
def test_job_from_resource_unknown_type(self):
27992969
from google.cloud.bigquery.job import UnknownJob
28002970

0 commit comments

Comments
 (0)