-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathexample6.go
93 lines (82 loc) · 2.2 KB
/
example6.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main
import (
"io/ioutil"
"net/http"
"github.com/qedus/nds"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/bigquery/v2"
"google.golang.org/appengine"
"google.golang.org/appengine/datastore"
"github.com/captaincodeman/datastore-mapper"
)
type (
// streaming insert into BigQuery
example6 struct {
appID string
bq *bigquery.Service
}
)
func init() {
mapper.RegisterJob(&example6{})
}
func (x *example6) Query(r *http.Request) (*mapper.Query, error) {
q := mapper.NewQuery("photo")
q = q.NamespaceEmpty()
return q, nil
}
func (x *example6) SliceStarted(c context.Context, id string, namespace string, shard, slice int) {
x.bq, _ = bigqueryService(c)
x.appID = appengine.AppID(c)
}
func (x *example6) SliceCompleted(c context.Context, id string, namespace string, shard, slice int) {
}
// Next processes the next item
func (x *example6) Next(c context.Context, counters mapper.Counters, key *datastore.Key) error {
// we need to load the entity ourselves
photo := new(Photo)
if err := nds.Get(c, key, photo); err != nil {
return err
}
photo.ID = key.IntID()
suffix := photo.Taken.Format("20060102")
_, err := x.bq.Tabledata.InsertAll(x.appID, "datasetName", "tableName", &bigquery.TableDataInsertAllRequest{
TemplateSuffix: suffix,
Rows: []*bigquery.TableDataInsertAllRequestRows{
{
Json: map[string]bigquery.JsonValue{
"id": photo.ID,
"taken": photo.Taken,
"photographer": map[string]bigquery.JsonValue{
"id": photo.Photographer.ID,
"name": photo.Photographer.Name,
},
},
},
},
}).Context(c).Do()
return err
}
func bigqueryService(c context.Context) (*bigquery.Service, error) {
var client *http.Client
if appengine.IsDevAppServer() {
jsonKey, err := ioutil.ReadFile("service-account.json")
if err != nil {
return nil, err
}
conf, err := google.JWTConfigFromJSON(jsonKey, bigquery.BigqueryScope)
if err != nil {
return nil, err
}
client = conf.Client(c)
} else {
token := google.AppEngineTokenSource(c, bigquery.BigqueryScope)
client = oauth2.NewClient(c, token)
}
service, err := bigquery.New(client)
if err != nil {
return nil, err
}
return service, nil
}