Skip to content

Commit 6e86643

Browse files
Refactor
Add examples Simplify job interface (remove batching) Don't serialize jobSpec struct to job, just job name Implement query splitting using scatter or property range. Closes #18
1 parent 93b7ae3 commit 6e86643

27 files changed

+989
-772
lines changed

common.go

+33-18
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88
)
99

1010
type (
11+
// taskable (urgh, horrible name) is anything that can
12+
// be run as a task. It's basically all our datastore models
13+
// and used to set the private common values (context?)
1114
taskable interface {
12-
getID() string
13-
setID(id string)
14-
15-
getQueue() string
16-
setQueue(queue string)
15+
getCommon() *common
16+
setCommon(id string, jobSpec JobSpec, queue string)
1717
}
1818

1919
// common contains properties that are common across all
@@ -22,6 +22,9 @@ type (
2222
// Counters holds the task counters map
2323
Counters Counters `datastore:"-"`
2424

25+
// Query is the datastore query spec
26+
Query *Query `datastore:"-"`
27+
2528
// Active indicates if this task is still active
2629
Active bool `datastore:"active,noindex"`
2730

@@ -43,27 +46,25 @@ type (
4346
// private fields used by local instance
4447
id string
4548
queue string
49+
jobSpec JobSpec
4650
startTime time.Time
4751
}
4852
)
4953

50-
func (c *common) getID() string {
51-
return c.id
52-
}
53-
func (c *common) setID(id string) {
54-
c.id = id
54+
func (c *common) getCommon() *common {
55+
return c
5556
}
5657

57-
func (c *common) getQueue() string {
58-
return c.queue
59-
}
60-
func (c *common) setQueue(queue string) {
58+
func (c *common) setCommon(id string, jobSpec JobSpec, queue string) {
59+
c.id = id
60+
c.jobSpec = jobSpec
6161
c.queue = queue
6262
}
6363

64-
func (c *common) start() {
64+
func (c *common) start(query *Query) {
6565
c.Active = true
6666
c.Counters = NewCounters()
67+
c.Query = query
6768
c.Count = 0
6869
c.Started = getTime()
6970
c.Updated = c.Started
@@ -88,9 +89,17 @@ func (c *common) Load(props []datastore.Property) error {
8889

8990
c.Counters = make(map[string]int64)
9091
for _, prop := range props {
91-
if strings.HasPrefix(prop.Name, "counters.") {
92-
key := prop.Name[9:len(prop.Name)]
93-
c.Counters[key] = prop.Value.(int64)
92+
switch prop.Name {
93+
case "query":
94+
c.Query = &Query{}
95+
if err := c.Query.GobDecode(prop.Value.([]byte)); err != nil {
96+
return err
97+
}
98+
default:
99+
if strings.HasPrefix(prop.Name, "counters.") {
100+
key := prop.Name[9:len(prop.Name)]
101+
c.Counters[key] = prop.Value.(int64)
102+
}
94103
}
95104
}
96105

@@ -111,5 +120,11 @@ func (c *common) Save() ([]datastore.Property, error) {
111120
props = append(props, datastore.Property{Name: "counters." + key, Value: value, NoIndex: true, Multiple: false})
112121
}
113122

123+
b, err := c.Query.GobEncode()
124+
if err != nil {
125+
return nil, err
126+
}
127+
props = append(props, datastore.Property{Name: "query", Value: b, NoIndex: true, Multiple: false})
128+
114129
return props, nil
115130
}

config.go

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type (
4040

4141
// newConfig creates a new config with default values
4242
func newConfig() *Config {
43+
// TODO: use config as default, allow setting some values per-job
44+
// and prevent config changes affecting already-running tasks
4345
return &Config{
4446
Path: DefaultPath,
4547
DatastorePrefix: "MP_",

example/example1.go

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package main
2+
3+
import (
4+
"io"
5+
6+
"net/http"
7+
8+
"github.com/qedus/nds"
9+
"golang.org/x/net/context"
10+
"google.golang.org/appengine/datastore"
11+
12+
"github.com/captaincodeman/datastore-mapper"
13+
)
14+
15+
type (
16+
// simple keysonly iteration and aggregation using counters
17+
example1 struct{}
18+
)
19+
20+
func init() {
21+
mapper.RegisterJob(&example1{})
22+
}
23+
24+
func (x *example1) Query(r *http.Request) (*mapper.Query, error) {
25+
q := mapper.NewQuery("photo")
26+
q = q.Namespace("")
27+
return q, nil
28+
}
29+
30+
// Next processes the next item
31+
func (x *example1) Next(c context.Context, w io.Writer, counters mapper.Counters, key *datastore.Key) error {
32+
// we need to load the entity ourselves
33+
photo := new(Photo)
34+
if err := nds.Get(c, key, photo); err != nil {
35+
return err
36+
}
37+
38+
counters.Increment(photo.Photographer.Name, 1)
39+
return nil
40+
}

example/example2.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package main
2+
3+
import (
4+
"io"
5+
6+
"net/http"
7+
8+
"golang.org/x/net/context"
9+
"google.golang.org/appengine/datastore"
10+
11+
"github.com/captaincodeman/datastore-mapper"
12+
)
13+
14+
type (
15+
// simple eager iteration and aggregation using counters
16+
example2 struct {
17+
photo *Photo
18+
}
19+
)
20+
21+
func init() {
22+
mapper.RegisterJob(&example2{})
23+
}
24+
25+
func (x *example2) Query(r *http.Request) (*mapper.Query, error) {
26+
q := mapper.NewQuery("photo")
27+
q = q.Namespace("")
28+
return q, nil
29+
}
30+
31+
// Make creates the entity to load into
32+
func (x *example2) Make() interface{} {
33+
x.photo = new(Photo)
34+
return x.photo
35+
}
36+
37+
// Next processes the next item
38+
func (x *example2) Next(c context.Context, w io.Writer, counters mapper.Counters, key *datastore.Key) error {
39+
// the photo instance was loaded from the query by the mapper
40+
counters.Increment(x.photo.Photographer.Name, 1)
41+
return nil
42+
}

example/example3.go

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package main
2+
3+
import (
4+
"io"
5+
"time"
6+
7+
"net/http"
8+
9+
"github.com/qedus/nds"
10+
"golang.org/x/net/context"
11+
"google.golang.org/appengine/datastore"
12+
"google.golang.org/appengine/log"
13+
14+
"github.com/captaincodeman/datastore-mapper"
15+
)
16+
17+
type (
18+
// parse request parameters to create query
19+
example3 struct {}
20+
)
21+
22+
const (
23+
dateFormat = "2006-01-02"
24+
)
25+
26+
func init() {
27+
mapper.RegisterJob(&example3{})
28+
}
29+
30+
func (x *example3) Query(r *http.Request) (*mapper.Query, error) {
31+
v := r.URL.Query()
32+
33+
fromStr := v.Get("from")
34+
toStr := v.Get("to")
35+
now := time.Now().UTC()
36+
37+
var from time.Time
38+
var to time.Time
39+
var err error
40+
41+
// default to previous day but allow any
42+
if toStr == "" {
43+
to = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
44+
} else {
45+
to, err = time.Parse(dateFormat, toStr)
46+
if err != nil {
47+
return nil, err
48+
}
49+
}
50+
51+
if fromStr == "" {
52+
from = to.Add(time.Duration(-24) * time.Hour)
53+
} else {
54+
from, err = time.Parse(dateFormat, fromStr)
55+
if err != nil {
56+
return nil, err
57+
}
58+
}
59+
60+
q := mapper.NewQuery("photo")
61+
q = q.Namespace("")
62+
q = q.Filter("taken >=", from)
63+
q = q.Filter("taken <", to)
64+
65+
return q, nil
66+
}
67+
68+
// Next processes the next item
69+
func (x *example3) Next(c context.Context, w io.Writer, counters mapper.Counters, key *datastore.Key) error {
70+
photo := new(Photo)
71+
if err := nds.Get(c, key, photo); err != nil {
72+
log.Errorf(c, err.Error())
73+
return err
74+
}
75+
photo.ID = key.IntID()
76+
77+
counters.Increment(photo.Photographer.Name, 1)
78+
return nil
79+
}

example/example4.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package main
2+
3+
import (
4+
"io"
5+
6+
"net/http"
7+
8+
"golang.org/x/net/context"
9+
"google.golang.org/appengine/datastore"
10+
"google.golang.org/appengine/log"
11+
12+
"github.com/captaincodeman/datastore-mapper"
13+
)
14+
15+
type (
16+
// lifecycle notifications
17+
example4 struct{}
18+
)
19+
20+
func init() {
21+
mapper.RegisterJob(&example4{})
22+
}
23+
24+
func (x *example4) Query(r *http.Request) (*mapper.Query, error) {
25+
q := mapper.NewQuery("photo")
26+
q = q.Namespace("")
27+
return q, nil
28+
}
29+
30+
// Next processes the next item
31+
func (x *example4) Next(c context.Context, w io.Writer, counters mapper.Counters, key *datastore.Key) error {
32+
return nil
33+
}
34+
35+
// job lifecycle notifications
36+
func (x *example4) JobStarted(c context.Context, id string) {
37+
log.Debugf(c, "JobStarted %s", id)
38+
}
39+
func (x *example4) JobCompleted(c context.Context, id string) {
40+
log.Debugf(c, "JobCompleted %s", id)
41+
}
42+
func (x *example4) NamespaceStarted(c context.Context, id string, namespace string) {
43+
log.Debugf(c, "NamespaceStarted %s %s", id, namespace)
44+
}
45+
func (x *example4) NamespaceCompleted(c context.Context, id string, namespace string) {
46+
log.Debugf(c, "NamespaceCompleted %s %s", id, namespace)
47+
}
48+
func (x *example4) ShardStarted(c context.Context, id string, namespace string, shard int) {
49+
log.Debugf(c, "ShardStarted %s %s %d", id, namespace, shard)
50+
}
51+
func (x *example4) ShardCompleted(c context.Context, id string, namespace string, shard int) {
52+
log.Debugf(c, "ShardCompleted %s %s %d", id, namespace, shard)
53+
}
54+
func (x *example4) SliceStarted(c context.Context, id string, namespace string, shard, slice int) {
55+
log.Debugf(c, "SliceStarted %s %s %d %d", id, namespace, shard, slice)
56+
}
57+
func (x *example4) SliceCompleted(c context.Context, id string, namespace string, shard, slice int) {
58+
log.Debugf(c, "SliceCompleted %s %s %d %d", id, namespace, shard, slice)
59+
}

example/example5.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package main
2+
3+
import (
4+
"io"
5+
6+
"encoding/json"
7+
"net/http"
8+
9+
"golang.org/x/net/context"
10+
"google.golang.org/appengine/datastore"
11+
12+
"github.com/captaincodeman/datastore-mapper"
13+
)
14+
15+
type (
16+
// export custom JSON to Cloud Storage
17+
example5 struct {
18+
photo *Photo
19+
}
20+
21+
photoOutput struct {
22+
*Photo
23+
// add namespace for bigquery
24+
Namespace string `json:"namespace"`
25+
}
26+
)
27+
28+
func init() {
29+
mapper.RegisterJob(&example5{})
30+
}
31+
32+
func (x *example5) Query(r *http.Request) (*mapper.Query, error) {
33+
q := mapper.NewQuery("photo")
34+
q = q.Namespace("")
35+
return q, nil
36+
}
37+
38+
// Make creates the entity to load into
39+
func (x *example5) Make() interface{} {
40+
x.photo = new(Photo)
41+
return x.photo
42+
}
43+
44+
// Next processes the next item
45+
func (x *example5) Next(c context.Context, w io.Writer, counters mapper.Counters, key *datastore.Key) error {
46+
photo := x.photo
47+
photo.ID = key.IntID()
48+
49+
out := &photoOutput{
50+
Photo: photo,
51+
Namespace: key.Namespace(),
52+
}
53+
54+
enc := json.NewEncoder(w)
55+
enc.Encode(out)
56+
57+
return nil
58+
}

0 commit comments

Comments
 (0)