Skip to content

Commit 5f9eeb4

Browse files
janardhanvissavinothkumarr227
authored andcommitted
stats/opentelemetry: separate out interceptors for tracing and metrics (grpc#8063)
1 parent 541c8ce commit 5f9eeb4

File tree

6 files changed

+231
-121
lines changed

6 files changed

+231
-121
lines changed

stats/opentelemetry/client_metrics.go

Lines changed: 63 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,23 @@ import (
2121
"sync/atomic"
2222
"time"
2323

24-
otelcodes "go.opentelemetry.io/otel/codes"
25-
"go.opentelemetry.io/otel/trace"
24+
otelattribute "go.opentelemetry.io/otel/attribute"
25+
otelmetric "go.opentelemetry.io/otel/metric"
2626
"google.golang.org/grpc"
27-
grpccodes "google.golang.org/grpc/codes"
2827
estats "google.golang.org/grpc/experimental/stats"
2928
istats "google.golang.org/grpc/internal/stats"
3029
"google.golang.org/grpc/metadata"
3130
"google.golang.org/grpc/stats"
3231
"google.golang.org/grpc/status"
33-
34-
otelattribute "go.opentelemetry.io/otel/attribute"
35-
otelmetric "go.opentelemetry.io/otel/metric"
3632
)
3733

38-
type clientStatsHandler struct {
34+
type clientMetricsHandler struct {
3935
estats.MetricsRecorder
4036
options Options
4137
clientMetrics clientMetrics
4238
}
4339

44-
func (h *clientStatsHandler) initializeMetrics() {
40+
func (h *clientMetricsHandler) initializeMetrics() {
4541
// Will set no metrics to record, logically making this stats handler a
4642
// no-op.
4743
if h.options.MetricsOptions.MeterProvider == nil {
@@ -71,12 +67,25 @@ func (h *clientStatsHandler) initializeMetrics() {
7167
rm.registerMetrics(metrics, meter)
7268
}
7369

74-
func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
75-
ci := &callInfo{
76-
target: cc.CanonicalTarget(),
77-
method: h.determineMethod(method, opts...),
70+
// getOrCreateCallInfo returns the existing callInfo from context if present,
71+
// or creates and attaches a new one.
72+
func getOrCreateCallInfo(ctx context.Context, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (context.Context, *callInfo) {
73+
ci := getCallInfo(ctx)
74+
if ci == nil {
75+
if logger.V(2) {
76+
logger.Info("Creating new CallInfo since its not present in context")
77+
}
78+
ci = &callInfo{
79+
target: cc.CanonicalTarget(),
80+
method: determineMethod(method, opts...),
81+
}
82+
ctx = setCallInfo(ctx, ci)
7883
}
79-
ctx = setCallInfo(ctx, ci)
84+
return ctx, ci
85+
}
86+
87+
func (h *clientMetricsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
88+
ctx, ci := getOrCreateCallInfo(ctx, cc, method, opts...)
8089

8190
if h.options.MetricsOptions.pluginOption != nil {
8291
md := h.options.MetricsOptions.pluginOption.GetMetadata()
@@ -88,19 +97,15 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
8897
}
8998

9099
startTime := time.Now()
91-
var span trace.Span
92-
if h.options.isTracingEnabled() {
93-
ctx, span = h.createCallTraceSpan(ctx, method)
94-
}
95100
err := invoker(ctx, method, req, reply, cc, opts...)
96-
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
101+
h.perCallMetrics(ctx, err, startTime, ci)
97102
return err
98103
}
99104

100105
// determineMethod determines the method to record attributes with. This will be
101106
// "other" if StaticMethod isn't specified or if method filter is set and
102107
// specifies, the method name as is otherwise.
103-
func (h *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOption) string {
108+
func determineMethod(method string, opts ...grpc.CallOption) string {
104109
for _, opt := range opts {
105110
if _, ok := opt.(grpc.StaticMethodCallOption); ok {
106111
return removeLeadingSlash(method)
@@ -109,12 +114,8 @@ func (h *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOpt
109114
return "other"
110115
}
111116

112-
func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
113-
ci := &callInfo{
114-
target: cc.CanonicalTarget(),
115-
method: h.determineMethod(method, opts...),
116-
}
117-
ctx = setCallInfo(ctx, ci)
117+
func (h *clientMetricsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
118+
ctx, ci := getOrCreateCallInfo(ctx, cc, method, opts...)
118119

119120
if h.options.MetricsOptions.pluginOption != nil {
120121
md := h.options.MetricsOptions.pluginOption.GetMetadata()
@@ -126,49 +127,45 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
126127
}
127128

128129
startTime := time.Now()
129-
var span trace.Span
130-
if h.options.isTracingEnabled() {
131-
ctx, span = h.createCallTraceSpan(ctx, method)
132-
}
133130
callback := func(err error) {
134-
h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
131+
h.perCallMetrics(ctx, err, startTime, ci)
135132
}
136133
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
137134
return streamer(ctx, desc, cc, method, opts...)
138135
}
139136

140-
// perCallTracesAndMetrics records per call trace spans and metrics.
141-
func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) {
142-
if h.options.isTracingEnabled() {
143-
s := status.Convert(err)
144-
if s.Code() == grpccodes.OK {
145-
ts.SetStatus(otelcodes.Ok, s.Message())
146-
} else {
147-
ts.SetStatus(otelcodes.Error, s.Message())
148-
}
149-
ts.End()
150-
}
151-
if h.options.isMetricsEnabled() {
152-
callLatency := float64(time.Since(startTime)) / float64(time.Second)
153-
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
154-
otelattribute.String("grpc.method", ci.method),
155-
otelattribute.String("grpc.target", ci.target),
156-
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
157-
))
158-
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
159-
}
137+
// perCallMetrics records per call metrics for both unary and stream calls.
138+
func (h *clientMetricsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
139+
callLatency := float64(time.Since(startTime)) / float64(time.Second)
140+
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
141+
otelattribute.String("grpc.method", ci.method),
142+
otelattribute.String("grpc.target", ci.target),
143+
otelattribute.String("grpc.status", canonicalString(status.Code(err))),
144+
))
145+
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
160146
}
161147

162148
// TagConn exists to satisfy stats.Handler.
163-
func (h *clientStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
149+
func (h *clientMetricsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
164150
return ctx
165151
}
166152

167153
// HandleConn exists to satisfy stats.Handler.
168-
func (h *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
154+
func (h *clientMetricsHandler) HandleConn(context.Context, stats.ConnStats) {}
155+
156+
// getOrCreateRPCAttemptInfo retrieves or creates an rpc attemptInfo object
157+
// and ensures it is set in the context along with the rpcInfo.
158+
func getOrCreateRPCAttemptInfo(ctx context.Context) (context.Context, *attemptInfo) {
159+
ri := getRPCInfo(ctx)
160+
if ri != nil {
161+
return ctx, ri.ai
162+
}
163+
ri = &rpcInfo{ai: &attemptInfo{}}
164+
return setRPCInfo(ctx, ri), ri.ai
165+
}
169166

170-
// TagRPC implements per RPC attempt context management.
171-
func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
167+
// TagRPC implements per RPC attempt context management for metrics.
168+
func (h *clientMetricsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
172169
// Numerous stats handlers can be used for the same channel. The cluster
173170
// impl balancer which writes to this will only write once, thus have this
174171
// stats handler's per attempt scoped context point to the same optional
@@ -185,34 +182,25 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
185182
}
186183
ctx = istats.SetLabels(ctx, labels)
187184
}
188-
ai := &attemptInfo{
189-
startTime: time.Now(),
190-
xdsLabels: labels.TelemetryLabels,
191-
method: removeLeadingSlash(info.FullMethodName),
192-
}
193-
if h.options.isTracingEnabled() {
194-
ctx, ai = h.traceTagRPC(ctx, ai, info.NameResolutionDelay)
195-
}
196-
return setRPCInfo(ctx, &rpcInfo{
197-
ai: ai,
198-
})
185+
ctx, ai := getOrCreateRPCAttemptInfo(ctx)
186+
ai.startTime = time.Now()
187+
ai.xdsLabels = labels.TelemetryLabels
188+
ai.method = removeLeadingSlash(info.FullMethodName)
189+
190+
return setRPCInfo(ctx, &rpcInfo{ai: ai})
199191
}
200192

201-
func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
193+
// HandleRPC handles per RPC stats implementation.
194+
func (h *clientMetricsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
202195
ri := getRPCInfo(ctx)
203196
if ri == nil {
204197
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
205198
return
206199
}
207-
if h.options.isMetricsEnabled() {
208-
h.processRPCEvent(ctx, rs, ri.ai)
209-
}
210-
if h.options.isTracingEnabled() {
211-
populateSpan(rs, ri.ai)
212-
}
200+
h.processRPCEvent(ctx, rs, ri.ai)
213201
}
214202

215-
func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
203+
func (h *clientMetricsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
216204
switch st := s.(type) {
217205
case *stats.Begin:
218206
ci := getCallInfo(ctx)
@@ -240,7 +228,7 @@ func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCSta
240228
}
241229
}
242230

243-
func (h *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) {
231+
func (h *clientMetricsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) {
244232
if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil {
245233
labels := h.options.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
246234
if labels == nil {
@@ -250,7 +238,7 @@ func (h *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incoming
250238
}
251239
}
252240

253-
func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) {
241+
func (h *clientMetricsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) {
254242
ci := getCallInfo(ctx)
255243
if ci == nil {
256244
logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present")

stats/opentelemetry/client_tracing.go

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,72 @@ package opentelemetry
1818

1919
import (
2020
"context"
21+
"log"
2122
"strings"
2223

24+
otelcodes "go.opentelemetry.io/otel/codes"
2325
"go.opentelemetry.io/otel/trace"
2426
"google.golang.org/grpc"
27+
grpccodes "google.golang.org/grpc/codes"
28+
"google.golang.org/grpc/stats"
2529
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
30+
"google.golang.org/grpc/status"
2631
)
2732

2833
const (
2934
delayedResolutionEventName = "Delayed name resolution complete"
3035
tracerName = "grpc-go"
3136
)
3237

38+
type clientTracingHandler struct {
39+
options Options
40+
}
41+
42+
func (h *clientTracingHandler) initializeTraces() {
43+
if h.options.TraceOptions.TracerProvider == nil {
44+
log.Printf("TracerProvider is not provided in client TraceOptions")
45+
return
46+
}
47+
}
48+
49+
func (h *clientTracingHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
50+
ctx, _ = getOrCreateCallInfo(ctx, cc, method, opts...)
51+
52+
var span trace.Span
53+
ctx, span = h.createCallTraceSpan(ctx, method)
54+
err := invoker(ctx, method, req, reply, cc, opts...)
55+
h.finishTrace(err, span)
56+
return err
57+
}
58+
59+
func (h *clientTracingHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
60+
ctx, _ = getOrCreateCallInfo(ctx, cc, method, opts...)
61+
62+
var span trace.Span
63+
ctx, span = h.createCallTraceSpan(ctx, method)
64+
callback := func(err error) { h.finishTrace(err, span) }
65+
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
66+
return streamer(ctx, desc, cc, method, opts...)
67+
}
68+
69+
// finishTrace sets the span status based on the RPC result and ends the span.
70+
// It is used to finalize tracing for both unary and streaming calls.
71+
func (h *clientTracingHandler) finishTrace(err error, ts trace.Span) {
72+
s := status.Convert(err)
73+
if s.Code() == grpccodes.OK {
74+
ts.SetStatus(otelcodes.Ok, s.Message())
75+
} else {
76+
ts.SetStatus(otelcodes.Error, s.Message())
77+
}
78+
ts.End()
79+
}
80+
3381
// traceTagRPC populates provided context with a new span using the
3482
// TextMapPropagator supplied in trace options and internal itracing.carrier.
3583
// It creates a new outgoing carrier which serializes information about this
3684
// span into gRPC Metadata, if TextMapPropagator is provided in the trace
3785
// options. if TextMapPropagator is not provided, it returns the context as is.
38-
func (h *clientStatsHandler) traceTagRPC(ctx context.Context, ai *attemptInfo, nameResolutionDelayed bool) (context.Context, *attemptInfo) {
86+
func (h *clientTracingHandler) traceTagRPC(ctx context.Context, ai *attemptInfo, nameResolutionDelayed bool) (context.Context, *attemptInfo) {
3987
// Add a "Delayed name resolution complete" event to the call span
4088
// if there was name resolution delay. In case of multiple retry attempts,
4189
// ensure that event is added only once.
@@ -55,13 +103,34 @@ func (h *clientStatsHandler) traceTagRPC(ctx context.Context, ai *attemptInfo, n
55103

56104
// createCallTraceSpan creates a call span to put in the provided context using
57105
// provided TraceProvider. If TraceProvider is nil, it returns context as is.
58-
func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method string) (context.Context, trace.Span) {
59-
if h.options.TraceOptions.TracerProvider == nil {
60-
logger.Error("TraceProvider is not provided in trace options")
61-
return ctx, nil
62-
}
106+
func (h *clientTracingHandler) createCallTraceSpan(ctx context.Context, method string) (context.Context, trace.Span) {
63107
mn := "Sent." + strings.Replace(removeLeadingSlash(method), "/", ".", -1)
64108
tracer := h.options.TraceOptions.TracerProvider.Tracer(tracerName, trace.WithInstrumentationVersion(grpc.Version))
65109
ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient))
66110
return ctx, span
67111
}
112+
113+
// TagConn exists to satisfy stats.Handler for tracing.
114+
func (h *clientTracingHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
115+
return ctx
116+
}
117+
118+
// HandleConn exists to satisfy stats.Handler for tracing.
119+
func (h *clientTracingHandler) HandleConn(context.Context, stats.ConnStats) {}
120+
121+
// TagRPC implements per RPC attempt context management for traces.
122+
func (h *clientTracingHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
123+
ctx, ai := getOrCreateRPCAttemptInfo(ctx)
124+
ctx, ai = h.traceTagRPC(ctx, ai, info.NameResolutionDelay)
125+
return setRPCInfo(ctx, &rpcInfo{ai: ai})
126+
}
127+
128+
// HandleRPC handles per RPC tracing implementation.
129+
func (h *clientTracingHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
130+
ri := getRPCInfo(ctx)
131+
if ri == nil {
132+
logger.Error("ctx passed into client side tracing handler trace event handling has no client attempt data present")
133+
return
134+
}
135+
populateSpan(rs, ri.ai)
136+
}

stats/opentelemetry/metricsregistry_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ type metricsRecorderForTest interface {
4747
}
4848

4949
func newClientStatsHandler(options MetricsOptions) metricsRecorderForTest {
50-
return &clientStatsHandler{options: Options{MetricsOptions: options}}
50+
return &clientMetricsHandler{options: Options{MetricsOptions: options}}
5151
}
5252

5353
func newServerStatsHandler(options MetricsOptions) metricsRecorderForTest {
54-
return &serverStatsHandler{options: Options{MetricsOptions: options}}
54+
return &serverMetricsHandler{options: Options{MetricsOptions: options}}
5555
}
5656

5757
// TestMetricsRegistryMetrics tests the OpenTelemetry behavior with respect to

0 commit comments

Comments
 (0)