@@ -50,26 +50,35 @@ def load_data(schema_path, data_path, project_id, dataset_id, table_id):
50
50
credentials = GoogleCredentials .get_application_default ()
51
51
bigquery = discovery .build ('bigquery' , 'v2' , credentials = credentials )
52
52
53
+ # Infer the data format from the name of the data file.
54
+ source_format = 'CSV'
55
+ if data_path [- 5 :].lower () == '.json' :
56
+ source_format = 'NEWLINE_DELIMITED_JSON'
57
+
58
+ # Post to the jobs resource using the client's media upload interface. See:
59
+ # http://developers.google.com/api-client-library/python/guide/media_upload
53
60
insert_request = bigquery .jobs ().insert (
54
61
projectId = project_id ,
62
+ # Provide a configuration object. See:
63
+ # https://cloud.google.com/bigquery/docs/reference/v2/jobs#resource
55
64
body = {
56
- "configuration" : {
57
- "load" : {
58
- "schema" : {
59
- "fields" : json .load (open (schema_path , 'r' ))
65
+ 'configuration' : {
66
+ 'load' : {
67
+ 'schema' : {
68
+ 'fields' : json .load (open (schema_path , 'r' ))
69
+ },
70
+ 'destinationTable' : {
71
+ 'projectId' : project_id ,
72
+ 'datasetId' : dataset_id ,
73
+ 'tableId' : table_id
60
74
},
61
- "destinationTable" : {
62
- "projectId" : project_id ,
63
- "datasetId" : dataset_id ,
64
- "tableId" : table_id
65
- }
75
+ 'sourceFormat' : source_format ,
66
76
}
67
77
}
68
78
},
69
79
media_body = MediaFileUpload (
70
80
data_path ,
71
- mimetype = "application/octet-stream" ))
72
-
81
+ mimetype = 'application/octet-stream' ))
73
82
job = insert_request .execute ()
74
83
75
84
print ('Waiting for job to finish...' )
@@ -78,12 +87,14 @@ def load_data(schema_path, data_path, project_id, dataset_id, table_id):
78
87
projectId = job ['jobReference' ]['projectId' ],
79
88
jobId = job ['jobReference' ]['jobId' ])
80
89
90
+ # Poll the job until it finishes.
81
91
while True :
82
92
result = status_request .execute (num_retries = 2 )
83
93
84
94
if result ['status' ]['state' ] == 'DONE' :
85
- if 'errorResult' in result ['status' ]:
86
- raise RuntimeError (result ['status' ]['errorResult' ])
95
+ if result ['status' ].get ('errors' ):
96
+ raise RuntimeError ('\n ' .join (
97
+ e ['message' ] for e in result ['status' ]['errors' ]))
87
98
print ('Job complete.' )
88
99
return
89
100
0 commit comments