20
20
import warnings
21
21
22
22
import freezegun
23
+ from google .api_core import exceptions
24
+ import google .api_core .retry
23
25
import mock
24
26
import pytest
25
27
import requests
@@ -70,6 +72,12 @@ def _make_connection(*responses):
70
72
return mock_conn
71
73
72
74
75
+ def _make_retriable_exception ():
76
+ return exceptions .TooManyRequests (
77
+ "retriable exception" , errors = [{"reason" : "rateLimitExceeded" }]
78
+ )
79
+
80
+
73
81
def _make_job_resource (
74
82
creation_time_ms = 1437767599006 ,
75
83
started_time_ms = 1437767600007 ,
@@ -84,6 +92,7 @@ def _make_job_resource(
84
92
85
93
):
86
94
resource = {
95
+ "status" : {"state" : "PENDING" },
87
96
"configuration" : {job_type : {}},
88
97
"statistics" : {"creationTime" : creation_time_ms , job_type : {}},
89
98
"etag" : etag ,
@@ -97,9 +106,11 @@ def _make_job_resource(
97
106
98
107
if started or ended :
99
108
resource ["statistics" ]["startTime" ] = started_time_ms
109
+ resource ["status" ]["state" ] = "RUNNING"
100
110
101
111
if ended :
102
112
resource ["statistics" ]["endTime" ] = ended_time_ms
113
+ resource ["status" ]["state" ] = "DONE"
103
114
104
115
if job_type == "query" :
105
116
resource ["configuration" ]["query" ]["destinationTable" ] = {
@@ -555,14 +566,14 @@ def test__check_resource_config_ok(self):
555
566
def test__build_resource (self ):
556
567
client = _make_client (project = self .PROJECT )
557
568
job = self ._make_one (self .JOB_ID , client )
558
- with self . assertRaises ( NotImplementedError ):
559
- job . _build_resource ()
569
+ resource = job . _build_resource ()
570
+ assert resource [ "jobReference" ][ "jobId" ] == self . JOB_ID
560
571
561
572
def test_to_api_repr (self ):
562
573
client = _make_client (project = self .PROJECT )
563
574
job = self ._make_one (self .JOB_ID , client )
564
- with self . assertRaises ( NotImplementedError ):
565
- job . to_api_repr ()
575
+ resource = job . to_api_repr ()
576
+ assert resource [ "jobReference" ][ "jobId" ] == self . JOB_ID
566
577
567
578
def test__begin_already (self ):
568
579
job = self ._set_properties_job ()
@@ -965,43 +976,95 @@ def test_done_already(self):
965
976
966
977
self .assertTrue (job .done ())
967
978
968
- @mock .patch ("google.api_core.future.polling.PollingFuture.result" )
969
- def test_result_default_wo_state (self , result ):
970
- from google .cloud .bigquery .retry import DEFAULT_RETRY
971
-
972
- client = _make_client (project = self .PROJECT )
979
+ def test_result_default_wo_state (self ):
980
+ begun_job_resource = _make_job_resource (
981
+ job_id = self .JOB_ID , project_id = self .PROJECT , started = True
982
+ )
983
+ done_job_resource = _make_job_resource (
984
+ job_id = self .JOB_ID , project_id = self .PROJECT , started = True , ended = True
985
+ )
986
+ conn = _make_connection (
987
+ _make_retriable_exception (),
988
+ begun_job_resource ,
989
+ _make_retriable_exception (),
990
+ done_job_resource ,
991
+ )
992
+ client = _make_client (project = self .PROJECT , connection = conn )
973
993
job = self ._make_one (self .JOB_ID , client )
974
- begin = job ._begin = mock .Mock ()
975
994
976
- self .assertIs (job .result (), result . return_value )
995
+ self .assertIs (job .result (), job )
977
996
978
- begin .assert_called_once_with (retry = DEFAULT_RETRY , timeout = None )
979
- result .assert_called_once_with (timeout = None )
997
+ begin_call = mock .call (
998
+ method = "POST" ,
999
+ path = f"/projects/{ self .PROJECT } /jobs" ,
1000
+ data = {"jobReference" : {"jobId" : self .JOB_ID , "projectId" : self .PROJECT }},
1001
+ timeout = None ,
1002
+ )
1003
+ reload_call = mock .call (
1004
+ method = "GET" ,
1005
+ path = f"/projects/{ self .PROJECT } /jobs/{ self .JOB_ID } " ,
1006
+ query_params = {},
1007
+ timeout = None ,
1008
+ )
1009
+ conn .api_request .assert_has_calls (
1010
+ [begin_call , begin_call , reload_call , reload_call ]
1011
+ )
980
1012
981
- @mock .patch ("google.api_core.future.polling.PollingFuture.result" )
982
- def test_result_w_retry_wo_state (self , result ):
983
- client = _make_client (project = self .PROJECT )
1013
+ def test_result_w_retry_wo_state (self ):
1014
+ begun_job_resource = _make_job_resource (
1015
+ job_id = self .JOB_ID , project_id = self .PROJECT , started = True
1016
+ )
1017
+ done_job_resource = _make_job_resource (
1018
+ job_id = self .JOB_ID , project_id = self .PROJECT , started = True , ended = True
1019
+ )
1020
+ conn = _make_connection (
1021
+ exceptions .NotFound ("not normally retriable" ),
1022
+ begun_job_resource ,
1023
+ # The call to done() / reload() does not get the custom retry
1024
+ # policy passed to it, so we don't throw a non-retriable
1025
+ # exception here. See:
1026
+ # https://github.com/googleapis/python-bigquery/issues/24
1027
+ _make_retriable_exception (),
1028
+ done_job_resource ,
1029
+ )
1030
+ client = _make_client (project = self .PROJECT , connection = conn )
984
1031
job = self ._make_one (self .JOB_ID , client )
985
- begin = job ._begin = mock .Mock ()
986
- retry = mock .Mock ()
1032
+ custom_predicate = mock .Mock ()
1033
+ custom_predicate .return_value = True
1034
+ custom_retry = google .api_core .retry .Retry (predicate = custom_predicate )
987
1035
988
- self .assertIs (job .result (retry = retry ), result . return_value )
1036
+ self .assertIs (job .result (retry = custom_retry ), job )
989
1037
990
- begin .assert_called_once_with (retry = retry , timeout = None )
991
- result .assert_called_once_with (timeout = None )
1038
+ begin_call = mock .call (
1039
+ method = "POST" ,
1040
+ path = f"/projects/{ self .PROJECT } /jobs" ,
1041
+ data = {"jobReference" : {"jobId" : self .JOB_ID , "projectId" : self .PROJECT }},
1042
+ timeout = None ,
1043
+ )
1044
+ reload_call = mock .call (
1045
+ method = "GET" ,
1046
+ path = f"/projects/{ self .PROJECT } /jobs/{ self .JOB_ID } " ,
1047
+ query_params = {},
1048
+ timeout = None ,
1049
+ )
1050
+ conn .api_request .assert_has_calls (
1051
+ [begin_call , begin_call , reload_call , reload_call ]
1052
+ )
992
1053
993
- @ mock . patch ( "google.api_core.future.polling.PollingFuture.result" )
994
- def test_result_explicit_w_state ( self , result ):
995
- client = _make_client (project = self .PROJECT )
1054
+ def test_result_explicit_w_state ( self ):
1055
+ conn = _make_connection ()
1056
+ client = _make_client (project = self .PROJECT , connection = conn )
996
1057
job = self ._make_one (self .JOB_ID , client )
997
- job ._properties ["status" ] = {"state" : "DONE" }
998
- begin = job ._begin = mock .Mock ()
1058
+ # Use _set_properties() instead of directly modifying _properties so
1059
+ # that the result state is set properly.
1060
+ job_resource = job ._properties
1061
+ job_resource ["status" ] = {"state" : "DONE" }
1062
+ job ._set_properties (job_resource )
999
1063
timeout = 1
1000
1064
1001
- self .assertIs (job .result (timeout = timeout ), result . return_value )
1065
+ self .assertIs (job .result (timeout = timeout ), job )
1002
1066
1003
- begin .assert_not_called ()
1004
- result .assert_called_once_with (timeout = timeout )
1067
+ conn .api_request .assert_not_called ()
1005
1068
1006
1069
def test_cancelled_wo_error_result (self ):
1007
1070
client = _make_client (project = self .PROJECT )
0 commit comments