-
Notifications
You must be signed in to change notification settings - Fork 4.5k
stats/opentelemetry: separate out interceptors for tracing and metrics #8063
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
7ddbe46
0486355
53fa9cd
4435b8a
a413555
4e203c3
e9ad552
71804b4
4b3bd26
69df069
770f430
b97a3ca
c89f3c9
3f07e48
5e8a4a5
76e422a
8fa0b03
1f41a49
d74c61d
170eef6
7f5f539
50999a0
68b8966
8b56f0f
ac6080f
ed5506c
be72377
a5ed115
f243b43
efb738f
7b97c65
e8b0180
f70274b
5e607c5
a850532
0f39f8d
2a914e3
05b4278
105efe9
3d78d59
44fce2d
e5f50b7
ffcc0fa
67e5b24
8cf7f15
7949501
ad22a22
dd179fb
3201805
a3915ef
1ff5159
4133dd3
f8e0cac
0b89cf6
42c08a0
5bb48b3
fb8025d
cc39d1a
eb8321f
c858ed2
5c57489
cc12a84
aae5419
18a9c18
130ca22
397abf0
f4b4fd6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,54 +38,54 @@ | |
|
||
func (h *clientTracingHandler) initializeTraces() { | ||
if h.options.TraceOptions.TracerProvider == nil { | ||
log.Printf("TracerProvider is not provided in client TraceOptions") | ||
return | ||
} | ||
} | ||
|
||
func (h *clientTracingHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { | ||
ci := getCallInfo(ctx) | ||
purnesh42H marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if ci == nil { | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if logger.V(2) { | ||
logger.Info("Creating new CallInfo since its not present in context in clientTracingHandler unaryInterceptor") | ||
} | ||
ci = &callInfo{ | ||
target: cc.CanonicalTarget(), | ||
method: determineMethod(method, opts...), | ||
} | ||
ctx = setCallInfo(ctx, ci) | ||
} | ||
|
||
var span trace.Span | ||
ctx, span = h.createCallTraceSpan(ctx, method) | ||
err := invoker(ctx, method, req, reply, cc, opts...) | ||
h.perCallTraces(err, span) | ||
h.finishTrace(err, span) | ||
return err | ||
} | ||
|
||
func (h *clientTracingHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { | ||
ci := getCallInfo(ctx) | ||
purnesh42H marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if ci == nil { | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if logger.V(2) { | ||
logger.Info("Creating new CallInfo since its not present in context in clientTracingHandler streamInterceptor") | ||
} | ||
ci = &callInfo{ | ||
target: cc.CanonicalTarget(), | ||
method: determineMethod(method, opts...), | ||
} | ||
ctx = setCallInfo(ctx, ci) | ||
} | ||
|
||
var span trace.Span | ||
ctx, span = h.createCallTraceSpan(ctx, method) | ||
callback := func(err error) { h.perCallTraces(err, span) } | ||
callback := func(err error) { h.finishTrace(err, span) } | ||
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) | ||
return streamer(ctx, desc, cc, method, opts...) | ||
} | ||
|
||
// perCallTraces sets the span status based on the RPC result and ends the span. | ||
// finishTrace sets the span status based on the RPC result and ends the span. | ||
// It is used to finalize tracing for both unary and streaming calls. | ||
func (h *clientTracingHandler) perCallTraces(err error, ts trace.Span) { | ||
func (h *clientTracingHandler) finishTrace(err error, ts trace.Span) { | ||
s := status.Convert(err) | ||
if s.Code() == grpccodes.OK { | ||
ts.SetStatus(otelcodes.Ok, s.Message()) | ||
|
@@ -130,8 +130,8 @@ | |
func (h *clientTracingHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { | ||
ri := getRPCInfo(ctx) | ||
purnesh42H marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var ai *attemptInfo | ||
if ri == nil { | ||
if ri.ai == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think here also we can't assume ri is not nil if the order of stats handlers changes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
ai = &attemptInfo{} | ||
} else { | ||
ai = ri.ai | ||
} | ||
|
@@ -143,8 +143,8 @@ | |
func (h *clientTracingHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { | ||
ri := getRPCInfo(ctx) | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if ri == nil { | ||
purnesh42H marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.Error("ctx passed into client side tracing handler trace event handling has no client attempt data present") | ||
return | ||
} | ||
populateSpan(rs, ri.ai) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,6 +65,10 @@ type Options struct { | |
TraceOptions experimental.TraceOptions | ||
} | ||
|
||
func (o *Options) isMetricsEnabled() bool { | ||
return o.MetricsOptions.MeterProvider != nil | ||
} | ||
|
||
func (o *Options) isTracingEnabled() bool { | ||
return o.TraceOptions.TracerProvider != nil | ||
} | ||
|
@@ -117,15 +121,31 @@ type MetricsOptions struct { | |
// For the traces supported by this instrumentation code, provide an | ||
// implementation of a TextMapPropagator and OpenTelemetry TracerProvider. | ||
func DialOption(o Options) grpc.DialOption { | ||
csh := &clientStatsHandler{options: o} | ||
csh.initializeMetrics() | ||
do := joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh)) | ||
if !o.isTracingEnabled() { | ||
return do | ||
var unaryInterceptors []grpc.UnaryClientInterceptor | ||
var streamInterceptors []grpc.StreamClientInterceptor | ||
var do []grpc.DialOption | ||
|
||
if o.isMetricsEnabled() { | ||
purnesh42H marked this conversation as resolved.
Show resolved
Hide resolved
purnesh42H marked this conversation as resolved.
Show resolved
Hide resolved
|
||
metricsHandler := &clientMetricsHandler{options: o} | ||
metricsHandler.initializeMetrics() | ||
unaryInterceptors = append(unaryInterceptors, metricsHandler.unaryInterceptor) | ||
streamInterceptors = append(streamInterceptors, metricsHandler.streamInterceptor) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is changing the current order. Let's avoid that. Keep only 2 variables metricsInterceptors and tracesInterceptors and add metricsInterceptors before traces There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
do = append(do, grpc.WithStatsHandler(metricsHandler)) | ||
} | ||
if o.isTracingEnabled() { | ||
tracingHandler := &clientTracingHandler{options: o} | ||
tracingHandler.initializeTraces() | ||
unaryInterceptors = append(unaryInterceptors, tracingHandler.unaryInterceptor) | ||
streamInterceptors = append(streamInterceptors, tracingHandler.streamInterceptor) | ||
do = append(do, grpc.WithStatsHandler(tracingHandler)) | ||
} | ||
if len(unaryInterceptors) > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these ifs will change to metrics and traces There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
do = append(do, grpc.WithChainUnaryInterceptor(unaryInterceptors...)) | ||
} | ||
tracingHandler := &clientTracingHandler{options: o} | ||
tracingHandler.initializeTraces() | ||
return joinDialOptions(do, grpc.WithChainUnaryInterceptor(tracingHandler.unaryInterceptor), grpc.WithChainStreamInterceptor(tracingHandler.streamInterceptor), grpc.WithStatsHandler(tracingHandler)) | ||
if len(streamInterceptors) > 0 { | ||
do = append(do, grpc.WithChainStreamInterceptor(streamInterceptors...)) | ||
} | ||
return joinDialOptions(do...) | ||
} | ||
|
||
var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption) | ||
|
@@ -146,15 +166,19 @@ var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) g | |
// For the traces supported by this instrumentation code, provide an | ||
// implementation of a TextMapPropagator and OpenTelemetry TracerProvider. | ||
func ServerOption(o Options) grpc.ServerOption { | ||
ssh := &serverStatsHandler{options: o} | ||
ssh.initializeMetrics() | ||
so := joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh)) | ||
if !o.isTracingEnabled() { | ||
return so | ||
var so []grpc.ServerOption | ||
|
||
if o.isMetricsEnabled() { | ||
purnesh42H marked this conversation as resolved.
Show resolved
Hide resolved
purnesh42H marked this conversation as resolved.
Show resolved
Hide resolved
|
||
metricsHandler := &serverMetricsHandler{options: o} | ||
metricsHandler.initializeMetrics() | ||
so = append(so, grpc.ChainUnaryInterceptor(metricsHandler.unaryInterceptor), grpc.ChainStreamInterceptor(metricsHandler.streamInterceptor), grpc.StatsHandler(metricsHandler)) | ||
} | ||
if o.isTracingEnabled() { | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
tracingHandler := &serverTracingHandler{options: o} | ||
tracingHandler.initializeTraces() | ||
so = append(so, grpc.StatsHandler(tracingHandler)) | ||
} | ||
tracingHandler := &serverTracingHandler{options: o} | ||
tracingHandler.initializeTraces() | ||
return joinServerOptions(so, grpc.ChainUnaryInterceptor(tracingHandler.unaryInterceptor), grpc.ChainStreamInterceptor(tracingHandler.streamInterceptor), grpc.StatsHandler(tracingHandler)) | ||
return joinServerOptions(so...) | ||
} | ||
|
||
// callInfo is information pertaining to the lifespan of the RPC client side. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,25 +33,17 @@ | |
|
||
func (h *serverTracingHandler) initializeTraces() { | ||
if h.options.TraceOptions.TracerProvider == nil { | ||
log.Printf("TracerProvider is not provided in server TraceOptions") | ||
return | ||
} | ||
} | ||
|
||
func (h *serverTracingHandler) unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { | ||
return handler(ctx, req) | ||
} | ||
|
||
func (h *serverTracingHandler) streamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { | ||
return handler(srv, ss) | ||
} | ||
|
||
// TagRPC implements per RPC attempt context management for traces. | ||
func (h *serverTracingHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { | ||
ri := getRPCInfo(ctx) | ||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var ai *attemptInfo | ||
if ri == nil { | ||
if ri.ai == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here. We can't assume ri to be not nil here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
ai = &attemptInfo{} | ||
} else { | ||
ai = ri.ai | ||
} | ||
|
@@ -83,9 +75,9 @@ | |
func (h *serverTracingHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { | ||
ri := getRPCInfo(ctx) | ||
if ri == nil { | ||
logger.Error("ctx passed into server side tracing handler trace event handling has no server call data present") | ||
return | ||
} | ||
populateSpan(rs, ri.ai) | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.