Skip to content

Commit f21ec9e

Browse files
ValentinZakharovamarzialijbachoriknikita-tkachenko-datadogjpbempel
authored
WebSocket support for Netty (#8632)
* Implemented WebSocket support for Netty 4.1 * Let propagate unhandled events and fix tests * Refactoring * Refactor netty test and fix instrumentation * Improved pipeline processing - now you can insert handler in any place * Fixed helper * Refactoring * WebSocket Server support for netty-4.0 * Missing handlers use cases for netty-4.1 * Fixed handlers for netty-4.0 * Tests for netty-4.0 * Refactoring * WebSocket Server support for netty-3.8 * Tests for netty-3.8 * Spotless * Fixed tests * Add profiler env check command to AgentCLI (#8671) * Remove dependency on bash from crash/oome uploder scripts (#8652) * Do not apply JUnit 4 instrumentation to MUnit runners (#8675) * Shutdown CI Visibility test event handlers before tracer (#8677) * Prevent double reporting of Scalatest events when using SBT with test forking (#8682) * Fix In-Product when config is empty (#8679) should not stop the product with empty config * Expand MUnit runners filter to catch munit.MUnitRunner in JUnit 4 instrumentation (#8683) * Remove unused TestEventsHandler methods (#8674) * Delete print line (#8686) * Exclude ProxyLeakTask exception from exception profiling (#8666) * Use jvmstat for JDKs 9+ programmatically (#8641) * Update test.retry_reason to use full name of the feature (#8689) * Allow dogstatsd port to be configurable with DD_DOGSTATSD_PORT (#8693) * configurable dogstatsd port * wait the client handshake * move netty ws client to interested modules * Added WebSocket tracing check --------- Co-authored-by: Andrea Marziali <[email protected]> Co-authored-by: Jaroslav Bachorik <[email protected]> Co-authored-by: Nikita Tkachenko <[email protected]> Co-authored-by: Jean-Philippe Bempel <[email protected]> Co-authored-by: Daniel Mohedano <[email protected]> Co-authored-by: Sarah Chen <[email protected]> Co-authored-by: Matt <[email protected]> Co-authored-by: Laplie Anderson <[email protected]>
1 parent 933cb97 commit f21ec9e

29 files changed

+1693
-116
lines changed

dd-java-agent/instrumentation/netty-3.8/src/main/java/datadog/trace/instrumentation/netty38/ChannelPipelineAdviceUtil.java

+43
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.netty38;
22

3+
import datadog.trace.api.InstrumenterConfig;
34
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
45
import datadog.trace.bootstrap.ContextStore;
56
import datadog.trace.instrumentation.netty38.client.HttpClientRequestTracingHandler;
@@ -9,6 +10,9 @@
910
import datadog.trace.instrumentation.netty38.server.HttpServerResponseTracingHandler;
1011
import datadog.trace.instrumentation.netty38.server.HttpServerTracingHandler;
1112
import datadog.trace.instrumentation.netty38.server.MaybeBlockResponseHandler;
13+
import datadog.trace.instrumentation.netty38.server.websocket.WebSocketServerRequestTracingHandler;
14+
import datadog.trace.instrumentation.netty38.server.websocket.WebSocketServerResponseTracingHandler;
15+
import datadog.trace.instrumentation.netty38.server.websocket.WebSocketServerTracingHandler;
1216
import org.jboss.netty.channel.Channel;
1317
import org.jboss.netty.channel.ChannelHandler;
1418
import org.jboss.netty.channel.ChannelPipeline;
@@ -18,6 +22,9 @@
1822
import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
1923
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
2024
import org.jboss.netty.handler.codec.http.HttpServerCodec;
25+
import org.jboss.netty.handler.codec.http.websocketx.WebSocket13FrameDecoder;
26+
import org.jboss.netty.handler.codec.http.websocketx.WebSocket13FrameEncoder;
27+
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
2128

2229
/**
2330
* When certain handlers are added to the pipeline, we want to add our corresponding tracing
@@ -46,6 +53,33 @@ public static void wrapHandler(
4653
new HttpServerResponseTracingHandler(contextStore));
4754
pipeline.addLast(
4855
MaybeBlockResponseHandler.class.getName(), new MaybeBlockResponseHandler(contextStore));
56+
} else if (handler instanceof WebSocketServerProtocolHandler) {
57+
if (InstrumenterConfig.get().isWebsocketTracingEnabled()) {
58+
if (pipeline.get(HttpServerTracingHandler.class) != null) {
59+
addHandlerAfter(
60+
pipeline,
61+
"datadog.trace.instrumentation.netty38.server.HttpServerTracingHandler",
62+
new WebSocketServerTracingHandler(contextStore));
63+
}
64+
}
65+
} else if (handler instanceof WebSocket13FrameEncoder) {
66+
if (InstrumenterConfig.get().isWebsocketTracingEnabled()) {
67+
if (pipeline.get(HttpServerRequestTracingHandler.class) != null) {
68+
addHandlerAfter(
69+
pipeline,
70+
"datadog.trace.instrumentation.netty38.server.HttpServerRequestTracingHandler",
71+
new WebSocketServerRequestTracingHandler(contextStore));
72+
}
73+
}
74+
} else if (handler instanceof WebSocket13FrameDecoder) {
75+
if (InstrumenterConfig.get().isWebsocketTracingEnabled()) {
76+
if (pipeline.get(HttpServerResponseTracingHandler.class) != null) {
77+
addHandlerAfter(
78+
pipeline,
79+
"datadog.trace.instrumentation.netty38.server.HttpServerResponseTracingHandler",
80+
new WebSocketServerResponseTracingHandler(contextStore));
81+
}
82+
}
4983
} else
5084
// Client pipeline handlers
5185
if (handler instanceof HttpClientCodec) {
@@ -64,4 +98,13 @@ public static void wrapHandler(
6498
CallDepthThreadLocalMap.reset(ChannelPipeline.class);
6599
}
66100
}
101+
102+
private static void addHandlerAfter(
103+
final ChannelPipeline pipeline, final String name, final ChannelHandler handler) {
104+
ChannelHandler existing = pipeline.get(handler.getClass());
105+
if (existing != null) {
106+
pipeline.remove(existing);
107+
}
108+
pipeline.addAfter(name, handler.getClass().getName(), handler);
109+
}
67110
}

dd-java-agent/instrumentation/netty-3.8/src/main/java/datadog/trace/instrumentation/netty38/ChannelTraceContext.java

+20
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import datadog.trace.bootstrap.ContextStore;
44
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
55
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
6+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
67
import org.jboss.netty.handler.codec.http.HttpHeaders;
78

89
public class ChannelTraceContext {
@@ -23,6 +24,9 @@ public ChannelTraceContext create() {
2324
boolean analyzedResponse;
2425
boolean blockedResponse;
2526

27+
HandlerContext.Sender senderHandlerContext;
28+
HandlerContext.Receiver receiverHandlerContext;
29+
2630
public void reset() {
2731
this.connectionContinuation = null;
2832
this.serverSpan = null;
@@ -88,4 +92,20 @@ public void setClientSpan(AgentSpan clientSpan) {
8892
public void setClientParentSpan(AgentSpan clientParentSpan) {
8993
this.clientParentSpan = clientParentSpan;
9094
}
95+
96+
public HandlerContext.Sender getSenderHandlerContext() {
97+
return senderHandlerContext;
98+
}
99+
100+
public void setSenderHandlerContext(HandlerContext.Sender senderHandlerContext) {
101+
this.senderHandlerContext = senderHandlerContext;
102+
}
103+
104+
public HandlerContext.Receiver getReceiverHandlerContext() {
105+
return receiverHandlerContext;
106+
}
107+
108+
public void setReceiverHandlerContext(HandlerContext.Receiver receiverHandlerContext) {
109+
this.receiverHandlerContext = receiverHandlerContext;
110+
}
91111
}

dd-java-agent/instrumentation/netty-3.8/src/main/java/datadog/trace/instrumentation/netty38/NettyChannelPipelineInstrumentation.java

+3
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public String[] helperClassNames() {
6868
packageName + ".server.HttpServerResponseTracingHandler",
6969
packageName + ".server.HttpServerTracingHandler",
7070
packageName + ".server.MaybeBlockResponseHandler",
71+
packageName + ".server.websocket.WebSocketServerTracingHandler",
72+
packageName + ".server.websocket.WebSocketServerRequestTracingHandler",
73+
packageName + ".server.websocket.WebSocketServerResponseTracingHandler",
7174
};
7275
}
7376

dd-java-agent/instrumentation/netty-3.8/src/main/java/datadog/trace/instrumentation/netty38/server/HttpServerResponseTracingHandler.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import datadog.trace.bootstrap.ContextStore;
77
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
88
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
9+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
910
import datadog.trace.instrumentation.netty38.ChannelTraceContext;
1011
import org.jboss.netty.channel.Channel;
1112
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -17,6 +18,7 @@
1718
public class HttpServerResponseTracingHandler extends SimpleChannelDownstreamHandler {
1819

1920
private final ContextStore<Channel, ChannelTraceContext> contextStore;
21+
private static final String UPGRADE_HEADER = "upgrade";
2022

2123
public HttpServerResponseTracingHandler(
2224
final ContextStore<Channel, ChannelTraceContext> contextStore) {
@@ -45,7 +47,16 @@ public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent m
4547
span.finish(); // Finish the span manually since finishSpanOnClose was false
4648
throw throwable;
4749
}
48-
if (response.getStatus() != HttpResponseStatus.CONTINUE) {
50+
final boolean isWebsocketUpgrade =
51+
response.getStatus() == HttpResponseStatus.SWITCHING_PROTOCOLS
52+
&& "websocket".equals(response.headers().get(UPGRADE_HEADER));
53+
if (isWebsocketUpgrade) {
54+
String channelId = ctx.getChannel().getId().toString();
55+
channelTraceContext.setSenderHandlerContext(new HandlerContext.Sender(span, channelId));
56+
}
57+
if (response.getStatus() != HttpResponseStatus.CONTINUE
58+
&& (response.getStatus() != HttpResponseStatus.SWITCHING_PROTOCOLS
59+
|| isWebsocketUpgrade)) {
4960
DECORATE.onResponse(span, response);
5061
DECORATE.beforeFinish(span);
5162
span.finish(); // Finish the span manually since finishSpanOnClose was false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package datadog.trace.instrumentation.netty38.server.websocket;
2+
3+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
4+
import static datadog.trace.bootstrap.instrumentation.decorator.WebsocketDecorator.DECORATE;
5+
import static datadog.trace.bootstrap.instrumentation.websocket.HandlersExtractor.MESSAGE_TYPE_TEXT;
6+
7+
import datadog.trace.bootstrap.ContextStore;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
9+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
10+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
11+
import datadog.trace.instrumentation.netty38.ChannelTraceContext;
12+
import org.jboss.netty.channel.Channel;
13+
import org.jboss.netty.channel.ChannelHandlerContext;
14+
import org.jboss.netty.channel.MessageEvent;
15+
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
16+
import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
17+
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
18+
import org.jboss.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
19+
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
20+
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
21+
22+
public class WebSocketServerRequestTracingHandler extends SimpleChannelUpstreamHandler {
23+
24+
private final ContextStore<Channel, ChannelTraceContext> contextStore;
25+
26+
public WebSocketServerRequestTracingHandler(
27+
final ContextStore<Channel, ChannelTraceContext> contextStore) {
28+
this.contextStore = contextStore;
29+
}
30+
31+
@Override
32+
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
33+
Object frame = event.getMessage();
34+
if (frame instanceof WebSocketFrame) {
35+
Channel channel = ctx.getChannel();
36+
37+
ChannelTraceContext traceContext = this.contextStore.get(channel);
38+
if (traceContext != null) {
39+
40+
HandlerContext.Receiver receiverContext = traceContext.getReceiverHandlerContext();
41+
if (receiverContext == null) {
42+
HandlerContext.Sender sessionState = traceContext.getSenderHandlerContext();
43+
if (sessionState != null) {
44+
receiverContext =
45+
new HandlerContext.Receiver(
46+
sessionState.getHandshakeSpan(), channel.getId().toString());
47+
traceContext.setReceiverHandlerContext(receiverContext);
48+
}
49+
}
50+
if (receiverContext != null) {
51+
if (frame instanceof TextWebSocketFrame) {
52+
// WebSocket Read Text Start
53+
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
54+
55+
final AgentSpan span =
56+
DECORATE.onReceiveFrameStart(
57+
receiverContext, textFrame.getText(), textFrame.isFinalFragment());
58+
try (final AgentScope scope = activateSpan(span)) {
59+
ctx.sendUpstream(event);
60+
// WebSocket Read Text Start
61+
} finally {
62+
if (textFrame.isFinalFragment()) {
63+
traceContext.setReceiverHandlerContext(null);
64+
DECORATE.onFrameEnd(receiverContext);
65+
}
66+
}
67+
return;
68+
}
69+
70+
if (frame instanceof BinaryWebSocketFrame) {
71+
// WebSocket Read Binary Start
72+
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
73+
final AgentSpan span =
74+
DECORATE.onReceiveFrameStart(
75+
receiverContext,
76+
binaryFrame.getBinaryData().array(),
77+
binaryFrame.isFinalFragment());
78+
try (final AgentScope scope = activateSpan(span)) {
79+
ctx.sendUpstream(event);
80+
} finally {
81+
// WebSocket Read Binary End
82+
if (binaryFrame.isFinalFragment()) {
83+
traceContext.setReceiverHandlerContext(null);
84+
DECORATE.onFrameEnd(receiverContext);
85+
}
86+
}
87+
88+
return;
89+
}
90+
91+
if (frame instanceof ContinuationWebSocketFrame) {
92+
ContinuationWebSocketFrame continuationWebSocketFrame =
93+
(ContinuationWebSocketFrame) frame;
94+
final AgentSpan span =
95+
DECORATE.onReceiveFrameStart(
96+
receiverContext,
97+
MESSAGE_TYPE_TEXT.equals(receiverContext.getMessageType())
98+
? continuationWebSocketFrame.getText()
99+
: continuationWebSocketFrame.getBinaryData().array(),
100+
continuationWebSocketFrame.isFinalFragment());
101+
try (final AgentScope scope = activateSpan(span)) {
102+
ctx.sendUpstream(event);
103+
} finally {
104+
if (continuationWebSocketFrame.isFinalFragment()) {
105+
traceContext.setReceiverHandlerContext(null);
106+
DECORATE.onFrameEnd(receiverContext);
107+
}
108+
}
109+
return;
110+
}
111+
112+
if (frame instanceof CloseWebSocketFrame) {
113+
// WebSocket Closed by client
114+
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
115+
int statusCode = closeFrame.getStatusCode();
116+
String reasonText = closeFrame.getReasonText();
117+
traceContext.setSenderHandlerContext(null);
118+
traceContext.setReceiverHandlerContext(null);
119+
final AgentSpan span =
120+
DECORATE.onSessionCloseReceived(receiverContext, reasonText, statusCode);
121+
try (final AgentScope scope = activateSpan(span)) {
122+
ctx.sendUpstream(event);
123+
if (closeFrame.isFinalFragment()) {
124+
DECORATE.onFrameEnd(receiverContext);
125+
}
126+
}
127+
return;
128+
}
129+
}
130+
}
131+
}
132+
133+
ctx.sendUpstream(event); // superclass does not throw
134+
}
135+
}

0 commit comments

Comments
 (0)