Skip to content

Commit a464b9b

Browse files
authored
Instrument Java Websocket API (JSR356) (#8440)
* Instrument Java Websocket API (JSR356) * Port span link related tests * Handle route for undertow upgrades * fix jetty11 test * make sure instrumentation is applied in spring boot test * add coverage for spanlinkattributes * spotless * fix undertow test * fix muzzle for tomcat * codenarc * fix forbidden api * apply review * change linkassert signature * Add suggestions * port dynamodb span link tests
1 parent 01d3277 commit a464b9b

File tree

95 files changed

+5928
-290
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+5928
-290
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package datadog.trace.bootstrap.instrumentation.decorator;
2+
3+
import static datadog.trace.api.DDTags.DECISION_MAKER_INHERITED;
4+
import static datadog.trace.api.DDTags.DECISION_MAKER_RESOURCE;
5+
import static datadog.trace.api.DDTags.DECISION_MAKER_SERVICE;
6+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
7+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_CLOSE_CODE;
8+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_CLOSE_REASON;
9+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_FRAMES;
10+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_LENGTH;
11+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_RECEIVE_TIME;
12+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_TYPE;
13+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_SESSION_ID;
14+
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
15+
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER;
16+
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER;
17+
18+
import datadog.trace.api.Config;
19+
import datadog.trace.api.time.SystemTimeSource;
20+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
21+
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
22+
import datadog.trace.bootstrap.instrumentation.api.NotSampledSpanContext;
23+
import datadog.trace.bootstrap.instrumentation.api.SpanAttributes;
24+
import datadog.trace.bootstrap.instrumentation.api.SpanLink;
25+
import datadog.trace.bootstrap.instrumentation.api.Tags;
26+
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
27+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
28+
import javax.annotation.Nonnull;
29+
30+
public class WebsocketDecorator extends BaseDecorator {
31+
private static final CharSequence WEBSOCKET = UTF8BytesString.create("websocket");
32+
private static final String[] INSTRUMENTATION_NAMES = {WEBSOCKET.toString()};
33+
private static final CharSequence WEBSOCKET_RECEIVE = UTF8BytesString.create("websocket.receive");
34+
private static final CharSequence WEBSOCKET_SEND = UTF8BytesString.create("websocket.send");
35+
private static final CharSequence WEBSOCKET_CLOSE = UTF8BytesString.create("websocket.close");
36+
37+
private static final SpanAttributes SPAN_ATTRIBUTES_RECEIVE =
38+
SpanAttributes.builder().put("dd.kind", "executed_from").build();
39+
private static final SpanAttributes SPAN_ATTRIBUTES_SEND =
40+
SpanAttributes.builder().put("dd.kind", "resuming").build();
41+
42+
public static final WebsocketDecorator DECORATE = new WebsocketDecorator();
43+
44+
@Override
45+
protected String[] instrumentationNames() {
46+
return INSTRUMENTATION_NAMES;
47+
}
48+
49+
@Override
50+
protected CharSequence spanType() {
51+
return InternalSpanTypes.WEBSOCKET;
52+
}
53+
54+
@Override
55+
protected CharSequence component() {
56+
return WEBSOCKET;
57+
}
58+
59+
@Override
60+
public AgentSpan afterStart(AgentSpan span) {
61+
return super.afterStart(span).setMeasured(true);
62+
}
63+
64+
@Nonnull
65+
public AgentSpan onReceiveFrameStart(
66+
final HandlerContext.Receiver handlerContext, final Object data, boolean partialDelivery) {
67+
handlerContext.recordChunkData(data, partialDelivery);
68+
return onFrameStart(
69+
WEBSOCKET_RECEIVE, SPAN_KIND_CONSUMER, handlerContext, SPAN_ATTRIBUTES_RECEIVE, true);
70+
}
71+
72+
@Nonnull
73+
public AgentSpan onSessionCloseIssued(
74+
final HandlerContext.Sender handlerContext, CharSequence closeReason, int closeCode) {
75+
return onFrameStart(
76+
WEBSOCKET_CLOSE, SPAN_KIND_PRODUCER, handlerContext, SPAN_ATTRIBUTES_SEND, false)
77+
.setTag(WEBSOCKET_CLOSE_CODE, closeCode)
78+
.setTag(WEBSOCKET_CLOSE_REASON, closeReason);
79+
}
80+
81+
@Nonnull
82+
public AgentSpan onSessionCloseReceived(
83+
final HandlerContext.Receiver handlerContext, CharSequence closeReason, int closeCode) {
84+
return onFrameStart(
85+
WEBSOCKET_CLOSE, SPAN_KIND_CONSUMER, handlerContext, SPAN_ATTRIBUTES_RECEIVE, true)
86+
.setTag(WEBSOCKET_CLOSE_CODE, closeCode)
87+
.setTag(WEBSOCKET_CLOSE_REASON, closeReason);
88+
}
89+
90+
@Nonnull
91+
public AgentSpan onSendFrameStart(
92+
final HandlerContext.Sender handlerContext, final CharSequence msgType, final int msgSize) {
93+
handlerContext.recordChunkData(msgType, msgSize);
94+
return onFrameStart(
95+
WEBSOCKET_SEND, SPAN_KIND_PRODUCER, handlerContext, SPAN_ATTRIBUTES_SEND, false);
96+
}
97+
98+
public void onFrameEnd(final HandlerContext handlerContext) {
99+
if (handlerContext == null) {
100+
return;
101+
}
102+
final AgentSpan wsSpan = handlerContext.getWebsocketSpan();
103+
if (wsSpan == null) {
104+
return;
105+
}
106+
try {
107+
final long startTime = handlerContext.getFirstFrameTimestamp();
108+
if (startTime > 0) {
109+
wsSpan.setTag(
110+
WEBSOCKET_MESSAGE_RECEIVE_TIME,
111+
SystemTimeSource.INSTANCE.getCurrentTimeNanos() - startTime);
112+
}
113+
final long chunks = handlerContext.getMsgChunks();
114+
if (chunks > 0) {
115+
wsSpan.setTag(WEBSOCKET_MESSAGE_FRAMES, chunks);
116+
wsSpan.setTag(WEBSOCKET_MESSAGE_LENGTH, handlerContext.getMsgSize());
117+
wsSpan.setTag(WEBSOCKET_MESSAGE_TYPE, handlerContext.getMessageType());
118+
}
119+
(beforeFinish(wsSpan)).finish();
120+
} finally {
121+
handlerContext.reset();
122+
}
123+
}
124+
125+
private AgentSpan onFrameStart(
126+
final CharSequence operationName,
127+
final CharSequence spanKind,
128+
final HandlerContext handlerContext,
129+
final SpanAttributes linkAttributes,
130+
boolean traceStarter) {
131+
AgentSpan wsSpan = handlerContext.getWebsocketSpan();
132+
if (wsSpan == null) {
133+
final Config config = Config.get();
134+
final AgentSpan handshakeSpan = handlerContext.getHandshakeSpan();
135+
boolean inheritSampling = config.isWebsocketMessagesInheritSampling();
136+
boolean useDedicatedTraces = config.isWebsocketMessagesSeparateTraces();
137+
if (traceStarter) {
138+
if (useDedicatedTraces) {
139+
wsSpan = startSpan(WEBSOCKET.toString(), operationName, null);
140+
if (inheritSampling) {
141+
wsSpan.setTag(DECISION_MAKER_INHERITED, 1);
142+
wsSpan.setTag(DECISION_MAKER_SERVICE, handshakeSpan.getServiceName());
143+
wsSpan.setTag(DECISION_MAKER_RESOURCE, handshakeSpan.getResourceName());
144+
}
145+
} else {
146+
wsSpan = startSpan(WEBSOCKET.toString(), operationName, handshakeSpan.context());
147+
}
148+
} else {
149+
wsSpan = startSpan(WEBSOCKET.toString(), operationName);
150+
}
151+
handlerContext.setWebsocketSpan(wsSpan);
152+
afterStart(wsSpan);
153+
wsSpan.setTag(SPAN_KIND, spanKind);
154+
wsSpan.setResourceName(handlerContext.getWsResourceName());
155+
// carry over peer information for inferred services
156+
final String handshakePeerAddress = (String) handshakeSpan.getTag(Tags.PEER_HOSTNAME);
157+
if (handshakePeerAddress != null) {
158+
wsSpan.setTag(Tags.PEER_HOSTNAME, handshakePeerAddress);
159+
}
160+
if (config.isWebsocketTagSessionId()) {
161+
wsSpan.setTag(WEBSOCKET_SESSION_ID, handlerContext.getSessionId());
162+
}
163+
if (useDedicatedTraces || !traceStarter) {
164+
// the link is not added if the user wants to have receive frames on the same trace as the
165+
// handshake
166+
wsSpan.addLink(
167+
SpanLink.from(
168+
inheritSampling
169+
? handshakeSpan.context()
170+
: new NotSampledSpanContext(handshakeSpan.context()),
171+
SpanLink.DEFAULT_FLAGS,
172+
"",
173+
linkAttributes));
174+
}
175+
}
176+
return wsSpan;
177+
}
178+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package datadog.trace.bootstrap.instrumentation.websocket;
2+
3+
import datadog.trace.api.time.SystemTimeSource;
4+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
public abstract class HandlerContext {
9+
final Logger LOGGER = LoggerFactory.getLogger(HandlerContext.class);
10+
11+
private final AgentSpan handshakeSpan;
12+
private AgentSpan websocketSpan;
13+
private final String sessionId;
14+
protected long msgChunks = 0;
15+
protected long msgSize = 0;
16+
private final CharSequence wsResourceName;
17+
protected long firstFrameTimestamp;
18+
19+
public HandlerContext(AgentSpan handshakeSpan, String sessionId) {
20+
this.handshakeSpan = handshakeSpan;
21+
this.sessionId = sessionId;
22+
wsResourceName = ResourceNameExtractor.extractResourceName(handshakeSpan.getResourceName());
23+
}
24+
25+
public AgentSpan getHandshakeSpan() {
26+
return handshakeSpan;
27+
}
28+
29+
public AgentSpan getWebsocketSpan() {
30+
return websocketSpan;
31+
}
32+
33+
public void setWebsocketSpan(AgentSpan websocketSpan) {
34+
this.websocketSpan = websocketSpan;
35+
}
36+
37+
public String getSessionId() {
38+
return sessionId;
39+
}
40+
41+
public long getMsgChunks() {
42+
return msgChunks;
43+
}
44+
45+
public long getMsgSize() {
46+
return msgSize;
47+
}
48+
49+
public CharSequence getWsResourceName() {
50+
return wsResourceName;
51+
}
52+
53+
public long getFirstFrameTimestamp() {
54+
return firstFrameTimestamp;
55+
}
56+
57+
public abstract CharSequence getMessageType();
58+
59+
public void reset() {
60+
msgChunks = 0;
61+
websocketSpan = null;
62+
msgSize = 0;
63+
firstFrameTimestamp = 0;
64+
}
65+
66+
public static class Receiver extends HandlerContext {
67+
private boolean msgSizeExtractorInitialized = false;
68+
private HandlersExtractor.SizeCalculator msgSizeCalculator;
69+
70+
public Receiver(AgentSpan handshakeSpan, String sessionId) {
71+
super(handshakeSpan, sessionId);
72+
}
73+
74+
@Override
75+
public CharSequence getMessageType() {
76+
return msgSizeCalculator != null ? msgSizeCalculator.getFormat() : null;
77+
}
78+
79+
public void recordChunkData(Object data, boolean partialDelivery) {
80+
if (msgChunks++ == 0 && partialDelivery) {
81+
firstFrameTimestamp = SystemTimeSource.INSTANCE.getCurrentTimeNanos();
82+
}
83+
if (data == null) {
84+
return;
85+
}
86+
if (!msgSizeExtractorInitialized) {
87+
msgSizeExtractorInitialized = true;
88+
msgSizeCalculator = HandlersExtractor.getSizeCalculator(data);
89+
}
90+
91+
if (msgSizeCalculator != null) {
92+
try {
93+
int sz = msgSizeCalculator.getLengthFunction().applyAsInt(data);
94+
msgSize += sz;
95+
if (partialDelivery && sz == 0) {
96+
msgChunks--; // if we receive an empty frame with the fin bit don't count it as a chunk
97+
}
98+
} catch (Throwable t) {
99+
LOGGER.debug(
100+
"Unable to calculate websocket message size for data type {}",
101+
data.getClass().getName(),
102+
t);
103+
}
104+
}
105+
}
106+
}
107+
108+
public static class Sender extends HandlerContext {
109+
private CharSequence msgType;
110+
111+
public Sender(AgentSpan handshakeSpan, String sessionId) {
112+
super(handshakeSpan, sessionId);
113+
}
114+
115+
@Override
116+
public CharSequence getMessageType() {
117+
return msgType;
118+
}
119+
120+
@Override
121+
public void reset() {
122+
super.reset();
123+
msgType = null;
124+
}
125+
126+
public void recordChunkData(CharSequence type, int size) {
127+
msgChunks++;
128+
if (msgType == null) {
129+
msgType = type;
130+
}
131+
msgSize += size;
132+
}
133+
}
134+
}

0 commit comments

Comments
 (0)