Skip to content
This repository was archived by the owner on Oct 3, 2023. It is now read-only.

Commit 706a872

Browse files
laoj2dashpolepunya
authored
Call CreateServiceTimeSeries for service time series (#294)
Co-authored-by: David Ashpole <[email protected]> Co-authored-by: Punya Biswal <[email protected]>
1 parent 0a4a840 commit 706a872

File tree

11 files changed

+615
-85
lines changed

11 files changed

+615
-85
lines changed

equivalence_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ type fakeMetricsServer struct {
372372
monitoringpb.MetricServiceServer
373373
mu sync.RWMutex
374374
stackdriverTimeSeries []*monitoringpb.CreateTimeSeriesRequest
375+
stackdriverServiceTimeSeries []*monitoringpb.CreateTimeSeriesRequest
375376
stackdriverMetricDescriptors []*monitoringpb.CreateMetricDescriptorRequest
376377
}
377378

@@ -437,3 +438,10 @@ func (server *fakeMetricsServer) CreateTimeSeries(ctx context.Context, req *moni
437438
server.mu.Unlock()
438439
return new(empty.Empty), nil
439440
}
441+
442+
func (server *fakeMetricsServer) CreateServiceTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) (*empty.Empty, error) {
443+
server.mu.Lock()
444+
server.stackdriverServiceTimeSeries = append(server.stackdriverServiceTimeSeries, req)
445+
server.mu.Unlock()
446+
return new(empty.Empty), nil
447+
}

go.mod

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,22 @@ module contrib.go.opencensus.io/exporter/stackdriver
33
go 1.15
44

55
require (
6-
cloud.google.com/go v0.75.0
6+
cloud.google.com/go v0.97.0
7+
cloud.google.com/go/monitoring v1.1.0
8+
cloud.google.com/go/trace v1.0.0
79
github.com/aws/aws-sdk-go v1.37.0
810
github.com/census-instrumentation/opencensus-proto v0.3.0
9-
github.com/golang/protobuf v1.4.3
10-
github.com/google/go-cmp v0.5.4
11+
github.com/golang/protobuf v1.5.2
12+
github.com/google/go-cmp v0.5.6
1113
github.com/jstemmer/go-junit-report v0.9.1
12-
go.opencensus.io v0.22.6
13-
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5
14-
golang.org/x/net v0.0.0-20210119194325-5f4716e94777
15-
golang.org/x/oauth2 v0.0.0-20210126194326-f9ce19ea3013
16-
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect
17-
golang.org/x/text v0.3.5 // indirect
18-
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e
19-
google.golang.org/api v0.37.0
20-
google.golang.org/genproto v0.0.0-20210126160654-44e461bb6506
21-
google.golang.org/grpc v1.35.0
22-
google.golang.org/protobuf v1.25.0
14+
go.opencensus.io v0.23.0
15+
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616
16+
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420
17+
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1
18+
golang.org/x/tools v0.1.5
19+
google.golang.org/api v0.59.0
20+
google.golang.org/genproto v0.0.0-20211018162055-cf77aa76bad2
21+
google.golang.org/grpc v1.40.0
22+
google.golang.org/protobuf v1.27.1
2323
honnef.co/go/tools v0.0.1-2020.1.4
2424
)

go.sum

Lines changed: 153 additions & 0 deletions
Large diffs are not rendered by default.

metrics.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,24 @@ func (se *statsExporter) uploadMetrics(metrics []*metricdata.Metric) error {
111111
end = len(allTimeSeries)
112112
}
113113
batch := allTimeSeries[start:end]
114-
ctsreql := se.combineTimeSeriesToCreateTimeSeriesRequest(batch)
115-
for _, ctsreq := range ctsreql {
116-
if err := createTimeSeries(ctx, se.c, ctsreq); err != nil {
117-
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
118-
errors = append(errors, err)
114+
serviceTsBatch, nonServiceTsBatch := splitTimeSeries(batch)
115+
116+
if len(nonServiceTsBatch) > 0 {
117+
nonServiceReql := se.combineTimeSeriesToCreateTimeSeriesRequest(nonServiceTsBatch)
118+
for _, ctsreq := range nonServiceReql {
119+
if err := createTimeSeries(ctx, se.c, ctsreq); err != nil {
120+
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
121+
errors = append(errors, err)
122+
}
123+
}
124+
}
125+
if len(serviceTsBatch) > 0 {
126+
serviceReql := se.combineTimeSeriesToCreateTimeSeriesRequest(serviceTsBatch)
127+
for _, ctsreq := range serviceReql {
128+
if err := createServiceTimeSeries(ctx, se.c, ctsreq); err != nil {
129+
span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()})
130+
errors = append(errors, err)
131+
}
119132
}
120133
}
121134
}

metrics_batcher.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,20 +138,36 @@ var timeSeriesErrRegex = regexp.MustCompile(`: timeSeries\[([0-9]+(?:-[0-9]+)?(?
138138

139139
// sendReq sends create time series requests to Stackdriver,
140140
// and returns the count of dropped time series and error.
141-
func sendReq(ctx context.Context, c *monitoring.MetricClient, req *monitoringpb.CreateTimeSeriesRequest) (int, error) {
141+
func sendReq(ctx context.Context, c *monitoring.MetricClient, req *monitoringpb.CreateTimeSeriesRequest) (int, []error) {
142142
// c == nil only happens in unit tests where we don't make real calls to Stackdriver server
143143
if c == nil {
144144
return 0, nil
145145
}
146146

147-
err := createTimeSeries(ctx, c, req)
148-
if err == nil {
149-
return 0, nil
147+
dropped := 0
148+
errors := []error{}
149+
serviceReq, nonServiceReq := splitCreateTimeSeriesRequest(req)
150+
if nonServiceReq != nil {
151+
err := createTimeSeries(ctx, c, nonServiceReq)
152+
if err != nil {
153+
dropped += droppedTimeSeriesFromMonitoringAPIError(nonServiceReq, err)
154+
errors = append(errors, err)
155+
}
150156
}
157+
if serviceReq != nil {
158+
err := createServiceTimeSeries(ctx, c, serviceReq)
159+
if err != nil {
160+
dropped += droppedTimeSeriesFromMonitoringAPIError(serviceReq, err)
161+
errors = append(errors, err)
162+
}
163+
}
164+
return dropped, errors
165+
}
151166

152-
droppedTimeSeriesRangeMatches := timeSeriesErrRegex.FindAllStringSubmatch(err.Error(), -1)
153-
if !strings.HasPrefix(err.Error(), "One or more TimeSeries could not be written:") || len(droppedTimeSeriesRangeMatches) == 0 {
154-
return len(req.TimeSeries), err
167+
func droppedTimeSeriesFromMonitoringAPIError(req *monitoringpb.CreateTimeSeriesRequest, monitoringAPIerr error) int {
168+
droppedTimeSeriesRangeMatches := timeSeriesErrRegex.FindAllStringSubmatch(monitoringAPIerr.Error(), -1)
169+
if !strings.HasPrefix(monitoringAPIerr.Error(), "One or more TimeSeries could not be written:") || len(droppedTimeSeriesRangeMatches) == 0 {
170+
return len(req.TimeSeries)
155171
}
156172

157173
dropped := 0
@@ -171,7 +187,7 @@ func sendReq(ctx context.Context, c *monitoring.MetricClient, req *monitoringpb.
171187
}
172188
}
173189
}
174-
return dropped, err
190+
return dropped
175191
}
176192

177193
type worker struct {
@@ -219,10 +235,10 @@ func (w *worker) sendReqWithTimeout(req *monitoringpb.CreateTimeSeriesRequest) {
219235
w.recordDroppedTimeseries(sendReq(ctx, w.mc, req))
220236
}
221237

222-
func (w *worker) recordDroppedTimeseries(numTimeSeries int, err error) {
238+
func (w *worker) recordDroppedTimeseries(numTimeSeries int, errors []error) {
223239
w.resp.droppedTimeSeries += numTimeSeries
224-
if err != nil {
225-
w.resp.errs = append(w.resp.errs, err)
240+
if len(errors) > 0 {
241+
w.resp.errs = append(w.resp.errs, errors...)
226242
}
227243
}
228244

metrics_batcher_test.go

Lines changed: 72 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,7 @@ func TestWorkers(t *testing.T) {
4444
}
4545
m2 := newMetricsBatcher(ctx, "test", 2, c2, defaultTimeout) // batcher with 2 workers
4646

47-
tss := make([]*monitoringpb.TimeSeries, 0, 500) // make 500 time series, should be split to 3 reqs
48-
for i := 0; i < 500; i++ {
49-
tss = append(tss, makeTs(i))
50-
}
47+
tss := makeTs(500, false) // make 500 time series, should be split to 3 reqs
5148

5249
for _, ts := range tss {
5350
m1.addTimeSeries(ts)
@@ -89,69 +86,97 @@ func makeClient(addr string) (*monitoring.MetricClient, error) {
8986
return monitoring.NewMetricClient(context.Background(), option.WithGRPCConn(conn))
9087
}
9188

92-
func makeTs(i int) *monitoringpb.TimeSeries {
93-
return &monitoringpb.TimeSeries{
94-
Metric: &googlemetricpb.Metric{
95-
Type: fmt.Sprintf("custom.googleapis.com/opencensus/test/metric/%v", i),
96-
Labels: map[string]string{
97-
"key": fmt.Sprintf("test_%v", i),
89+
// makeTs returns a list of n *monitoringpb.TimeSeries. The metric type (service/non-service)
90+
// is determined by serviceMetric
91+
func makeTs(n int, serviceMetric bool) []*monitoringpb.TimeSeries {
92+
var tsl []*monitoringpb.TimeSeries
93+
for i := 0; i < n; i++ {
94+
metricType := fmt.Sprintf("custom.googleapis.com/opencensus/test/metric/%v", i)
95+
if serviceMetric {
96+
metricType = fmt.Sprintf("kubernetes.io/opencensus/test/metric/%v", i)
97+
}
98+
tsl = append(tsl, &monitoringpb.TimeSeries{
99+
Metric: &googlemetricpb.Metric{
100+
Type: metricType,
101+
Labels: map[string]string{
102+
"key": fmt.Sprintf("test_%v", i),
103+
},
98104
},
99-
},
100-
MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE,
101-
ValueType: googlemetricpb.MetricDescriptor_INT64,
102-
Points: []*monitoringpb.Point{
103-
{
104-
Value: &monitoringpb.TypedValue{
105-
Value: &monitoringpb.TypedValue_Int64Value{
106-
Int64Value: int64(i),
105+
MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE,
106+
ValueType: googlemetricpb.MetricDescriptor_INT64,
107+
Points: []*monitoringpb.Point{
108+
{
109+
Value: &monitoringpb.TypedValue{
110+
Value: &monitoringpb.TypedValue_Int64Value{
111+
Int64Value: int64(i),
112+
},
107113
},
108114
},
109115
},
110-
},
116+
})
111117
}
118+
return tsl
112119
}
113120

114121
func TestSendReqAndParseDropped(t *testing.T) {
115122
type testCase struct {
116-
name string
117-
timeseriesCount int
118-
createTimeSeriesFunc func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error
119-
expectedErr bool
120-
expectedDropped int
123+
name string
124+
nonServiceTimeSeriesCount int
125+
serviceTimeSeriesCount int
126+
createTimeSeriesFunc func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error
127+
createServiceTimeSeriesFunc func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error
128+
expectedErr bool
129+
expectedDropped int
121130
}
122131

123132
testCases := []testCase{
124133
{
125-
name: "No error",
126-
timeseriesCount: 75,
134+
name: "No error",
135+
serviceTimeSeriesCount: 75,
136+
nonServiceTimeSeriesCount: 75,
127137
createTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
128138
return nil
129139
},
140+
createServiceTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
141+
return nil
142+
},
130143
expectedErr: false,
131144
expectedDropped: 0,
132145
},
133146
{
134-
name: "Partial error",
135-
timeseriesCount: 75,
147+
name: "Partial error",
148+
serviceTimeSeriesCount: 75,
149+
nonServiceTimeSeriesCount: 75,
136150
createTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
137151
return errors.New("One or more TimeSeries could not be written: Internal error encountered. Please retry after a few seconds. If internal errors persist, contact support at https://cloud.google.com/support/docs.: timeSeries[0-16,25-44,46-74]; Unknown metric: agent.googleapis.com/system.swap.page_faults: timeSeries[45]")
138152
},
153+
createServiceTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
154+
return errors.New("One or more TimeSeries could not be written: Internal error encountered. Please retry after a few seconds. If internal errors persist, contact support at https://cloud.google.com/support/docs.: timeSeries[0-16,25-44,46-74]; Unknown metric: agent.googleapis.com/system.swap.page_faults: timeSeries[45]")
155+
},
139156
expectedErr: true,
140-
expectedDropped: 67,
157+
expectedDropped: 67 * 2,
141158
},
142159
{
143-
name: "Incorrectly formatted error",
144-
timeseriesCount: 75,
160+
name: "Incorrectly formatted error",
161+
nonServiceTimeSeriesCount: 75,
162+
serviceTimeSeriesCount: 75,
145163
createTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
146164
return errors.New("One or more TimeSeries could not be written: Internal error encountered. Please retry after a few seconds. If internal errors persist, contact support at https://cloud.google.com/support/docs.: timeSeries[0-16,25-44,,46-74]; Unknown metric: agent.googleapis.com/system.swap.page_faults: timeSeries[45x]")
147165
},
166+
createServiceTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
167+
return nil
168+
},
148169
expectedErr: true,
149170
expectedDropped: 75,
150171
},
151172
{
152-
name: "Unexpected error format",
153-
timeseriesCount: 75,
173+
name: "Unexpected error format",
174+
nonServiceTimeSeriesCount: 75,
175+
serviceTimeSeriesCount: 75,
154176
createTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
177+
return nil
178+
},
179+
createServiceTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error {
155180
return errors.New("err1")
156181
},
157182
expectedErr: true,
@@ -162,21 +187,28 @@ func TestSendReqAndParseDropped(t *testing.T) {
162187
for _, test := range testCases {
163188
t.Run(test.name, func(t *testing.T) {
164189
persistedCreateTimeSeries := createTimeSeries
190+
persistedCreateServiceTimeSeries := createServiceTimeSeries
165191
createTimeSeries = test.createTimeSeriesFunc
192+
createServiceTimeSeries = test.createServiceTimeSeriesFunc
193+
defer func() {
194+
createTimeSeries = persistedCreateTimeSeries
195+
createServiceTimeSeries = persistedCreateServiceTimeSeries
196+
}()
166197

167198
mc, _ := monitoring.NewMetricClient(context.Background())
168-
d, err := sendReq(context.Background(), mc, &monitoringpb.CreateTimeSeriesRequest{TimeSeries: make([]*monitoringpb.TimeSeries, test.timeseriesCount)})
169-
if !test.expectedErr && err != nil {
170-
t.Fatal("Expected no err")
199+
var tsl []*monitoringpb.TimeSeries
200+
tsl = append(tsl, makeTs(test.serviceTimeSeriesCount, true)...)
201+
tsl = append(tsl, makeTs(test.nonServiceTimeSeriesCount, false)...)
202+
d, errors := sendReq(context.Background(), mc, &monitoringpb.CreateTimeSeriesRequest{TimeSeries: tsl})
203+
if !test.expectedErr && len(errors) > 0 {
204+
t.Fatalf("Expected no errors, got %v", errors)
171205
}
172-
if test.expectedErr && err == nil {
173-
t.Fatal("Expected noerr")
206+
if test.expectedErr && len(errors) == 0 {
207+
t.Fatalf("Expected errors, got %v", errors)
174208
}
175209
if d != test.expectedDropped {
176210
t.Fatalf("Want %v dropped, got %v", test.expectedDropped, d)
177211
}
178-
179-
createTimeSeries = persistedCreateTimeSeries
180212
})
181213
}
182214
}

metrics_proto_api_test.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,9 @@ func executeTestCase(t *testing.T, tc *testCases, se *sd.Exporter, server *fakeM
398398
server.forEachStackdriverTimeSeries(func(sdt *monitoringpb.CreateTimeSeriesRequest) {
399399
gotTimeSeries = append(gotTimeSeries, sdt)
400400
})
401-
401+
server.forEachServiceStackdriverTimeSeries(func(sdt *monitoringpb.CreateTimeSeriesRequest) {
402+
gotTimeSeries = append(gotTimeSeries, sdt)
403+
})
402404
if diff, idx := requireTimeSeriesRequestEqual(t, gotTimeSeries, tc.outTSR); diff != "" {
403405
t.Errorf("Name[%s], TimeSeries[%d], Error: -got +want %s\n", tc.name, idx, diff)
404406
}
@@ -413,6 +415,7 @@ func executeTestCase(t *testing.T, tc *testCases, se *sd.Exporter, server *fakeM
413415
}
414416
server.resetStackdriverMetricDescriptors()
415417
server.resetStackdriverTimeSeries()
418+
server.resetStackdriverServiceTimeSeries()
416419
}
417420

418421
func readTestCaseFromFiles(t *testing.T, filename string) *testCases {
@@ -514,6 +517,7 @@ type fakeMetricsServer struct {
514517
monitoringpb.MetricServiceServer
515518
mu sync.RWMutex
516519
stackdriverTimeSeries []*monitoringpb.CreateTimeSeriesRequest
520+
stackdriverServiceTimeSeries []*monitoringpb.CreateTimeSeriesRequest
517521
stackdriverMetricDescriptors []*monitoringpb.CreateMetricDescriptorRequest
518522
}
519523

@@ -548,6 +552,15 @@ func (server *fakeMetricsServer) forEachStackdriverTimeSeries(fn func(sdt *monit
548552
}
549553
}
550554

555+
func (server *fakeMetricsServer) forEachServiceStackdriverTimeSeries(fn func(sdt *monitoringpb.CreateTimeSeriesRequest)) {
556+
server.mu.RLock()
557+
defer server.mu.RUnlock()
558+
559+
for _, sdt := range server.stackdriverServiceTimeSeries {
560+
fn(sdt)
561+
}
562+
}
563+
551564
func (server *fakeMetricsServer) forEachStackdriverMetricDescriptor(fn func(sdmd *monitoringpb.CreateMetricDescriptorRequest)) {
552565
server.mu.RLock()
553566
defer server.mu.RUnlock()
@@ -563,6 +576,12 @@ func (server *fakeMetricsServer) resetStackdriverTimeSeries() {
563576
server.mu.Unlock()
564577
}
565578

579+
func (server *fakeMetricsServer) resetStackdriverServiceTimeSeries() {
580+
server.mu.Lock()
581+
server.stackdriverServiceTimeSeries = server.stackdriverServiceTimeSeries[:0]
582+
server.mu.Unlock()
583+
}
584+
566585
func (server *fakeMetricsServer) resetStackdriverMetricDescriptors() {
567586
server.mu.Lock()
568587
server.stackdriverMetricDescriptors = server.stackdriverMetricDescriptors[:0]
@@ -583,6 +602,13 @@ func (server *fakeMetricsServer) CreateTimeSeries(ctx context.Context, req *moni
583602
return new(empty.Empty), nil
584603
}
585604

605+
func (server *fakeMetricsServer) CreateServiceTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) (*empty.Empty, error) {
606+
server.mu.Lock()
607+
server.stackdriverServiceTimeSeries = append(server.stackdriverServiceTimeSeries, req)
608+
server.mu.Unlock()
609+
return new(empty.Empty), nil
610+
}
611+
586612
func requireTimeSeriesRequestEqual(t *testing.T, got, want []*monitoringpb.CreateTimeSeriesRequest) (string, int) {
587613
diff := ""
588614
i := 0

0 commit comments

Comments
 (0)