Skip to content

Commit b017c7b

Browse files
committed
Instrument Java Websocket API (JSR356)
1 parent 2d7d791 commit b017c7b

File tree

86 files changed

+5760
-196
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+5760
-196
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package datadog.trace.bootstrap.instrumentation.decorator;
2+
3+
import static datadog.trace.api.DDTags.DECISION_MAKER_INHERITED;
4+
import static datadog.trace.api.DDTags.DECISION_MAKER_RESOURCE;
5+
import static datadog.trace.api.DDTags.DECISION_MAKER_SERVICE;
6+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
7+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_CLOSE_CODE;
8+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_CLOSE_REASON;
9+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_FRAMES;
10+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_LENGTH;
11+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_RECEIVE_TIME;
12+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_TYPE;
13+
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_SESSION_ID;
14+
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
15+
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER;
16+
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER;
17+
18+
import datadog.trace.api.Config;
19+
import datadog.trace.api.time.SystemTimeSource;
20+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
21+
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
22+
import datadog.trace.bootstrap.instrumentation.api.SpanAttributes;
23+
import datadog.trace.bootstrap.instrumentation.api.SpanLink;
24+
import datadog.trace.bootstrap.instrumentation.api.Tags;
25+
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
26+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
27+
import javax.annotation.Nonnull;
28+
29+
public class WebsocketDecorator extends BaseDecorator {
30+
private static final CharSequence WEBSOCKET = UTF8BytesString.create("websocket");
31+
private static final String[] INSTRUMENTATION_NAMES = {WEBSOCKET.toString()};
32+
private static final CharSequence WEBSOCKET_RECEIVE = UTF8BytesString.create("websocket.receive");
33+
private static final CharSequence WEBSOCKET_SEND = UTF8BytesString.create("websocket.send");
34+
private static final CharSequence WEBSOCKET_CLOSE = UTF8BytesString.create("websocket.close");
35+
36+
private static final SpanAttributes SPAN_ATTRIBUTES_RECEIVE =
37+
SpanAttributes.builder().put("dd.kind", "executed_from").build();
38+
private static final SpanAttributes SPAN_ATTRIBUTES_SEND =
39+
SpanAttributes.builder().put("dd.kind", "resuming").build();
40+
41+
public static final WebsocketDecorator DECORATE = new WebsocketDecorator();
42+
43+
@Override
44+
protected String[] instrumentationNames() {
45+
return INSTRUMENTATION_NAMES;
46+
}
47+
48+
@Override
49+
protected CharSequence spanType() {
50+
return InternalSpanTypes.WEBSOCKET;
51+
}
52+
53+
@Override
54+
protected CharSequence component() {
55+
return WEBSOCKET;
56+
}
57+
58+
@Override
59+
public AgentSpan afterStart(AgentSpan span) {
60+
return super.afterStart(span).setMeasured(true);
61+
}
62+
63+
@Nonnull
64+
public AgentSpan onReceiveFrameStart(
65+
final HandlerContext.Receiver handlerContext, final Object data, boolean partialDelivery) {
66+
handlerContext.recordChunkData(data, partialDelivery);
67+
return onFrameStart(
68+
WEBSOCKET_RECEIVE, SPAN_KIND_CONSUMER, handlerContext, SPAN_ATTRIBUTES_RECEIVE, true);
69+
}
70+
71+
@Nonnull
72+
public AgentSpan onSessionCloseIssued(
73+
final HandlerContext.Sender handlerContext, CharSequence closeReason, int closeCode) {
74+
return onFrameStart(
75+
WEBSOCKET_CLOSE, SPAN_KIND_PRODUCER, handlerContext, SPAN_ATTRIBUTES_SEND, false)
76+
.setTag(WEBSOCKET_CLOSE_CODE, closeCode)
77+
.setTag(WEBSOCKET_CLOSE_REASON, closeReason);
78+
}
79+
80+
@Nonnull
81+
public AgentSpan onSessionCloseReceived(
82+
final HandlerContext.Receiver handlerContext, CharSequence closeReason, int closeCode) {
83+
return onFrameStart(
84+
WEBSOCKET_CLOSE, SPAN_KIND_CONSUMER, handlerContext, SPAN_ATTRIBUTES_RECEIVE, true)
85+
.setTag(WEBSOCKET_CLOSE_CODE, closeCode)
86+
.setTag(WEBSOCKET_CLOSE_REASON, closeReason);
87+
}
88+
89+
@Nonnull
90+
public AgentSpan onSendFrameStart(
91+
final HandlerContext.Sender handlerContext, final CharSequence msgType, final int msgSize) {
92+
handlerContext.recordChunkData(msgType, msgSize);
93+
return onFrameStart(
94+
WEBSOCKET_SEND, SPAN_KIND_PRODUCER, handlerContext, SPAN_ATTRIBUTES_SEND, false);
95+
}
96+
97+
public void onFrameEnd(final HandlerContext handlerContext) {
98+
if (handlerContext == null) {
99+
return;
100+
}
101+
final AgentSpan wsSpan = handlerContext.getWebsocketSpan();
102+
try {
103+
final long startTime = handlerContext.getFirstFrameTimestamp();
104+
if (startTime > 0) {
105+
wsSpan.setTag(
106+
WEBSOCKET_MESSAGE_RECEIVE_TIME,
107+
SystemTimeSource.INSTANCE.getCurrentTimeNanos() - startTime);
108+
}
109+
final long chunks = handlerContext.getMsgChunks();
110+
if (chunks > 0) {
111+
wsSpan.setTag(WEBSOCKET_MESSAGE_FRAMES, chunks);
112+
wsSpan.setTag(WEBSOCKET_MESSAGE_LENGTH, handlerContext.getMsgSize());
113+
wsSpan.setTag(WEBSOCKET_MESSAGE_TYPE, handlerContext.getMessageType());
114+
}
115+
(beforeFinish(wsSpan)).finish();
116+
} finally {
117+
handlerContext.reset();
118+
}
119+
}
120+
121+
private AgentSpan onFrameStart(
122+
final CharSequence operationName,
123+
final CharSequence spanKind,
124+
final HandlerContext handlerContext,
125+
final SpanAttributes linkAttributes,
126+
boolean traceStarter) {
127+
AgentSpan wsSpan = handlerContext.getWebsocketSpan();
128+
if (wsSpan == null) {
129+
final Config config = Config.get();
130+
final AgentSpan handshakeSpan = handlerContext.getHandshakeSpan();
131+
boolean inheritSampling = config.isWebsocketMessagesInheritSampling();
132+
boolean useDedicatedTraces = config.isWebsocketMessagesSeparateTraces();
133+
if (traceStarter) {
134+
if (useDedicatedTraces) {
135+
wsSpan = startSpan(WEBSOCKET.toString(), operationName, null);
136+
if (inheritSampling) {
137+
wsSpan.setTag(DECISION_MAKER_INHERITED, 1);
138+
wsSpan.setTag(DECISION_MAKER_SERVICE, handshakeSpan.getServiceName());
139+
wsSpan.setTag(DECISION_MAKER_RESOURCE, handshakeSpan.getResourceName());
140+
}
141+
} else {
142+
wsSpan = startSpan(WEBSOCKET.toString(), operationName, handshakeSpan.context());
143+
}
144+
} else {
145+
wsSpan = startSpan(WEBSOCKET.toString(), operationName);
146+
}
147+
handlerContext.setWebsocketSpan(wsSpan);
148+
afterStart(wsSpan);
149+
wsSpan.setTag(SPAN_KIND, spanKind);
150+
wsSpan.setResourceName(handlerContext.getWsResourceName());
151+
// carry over peer information for inferred services
152+
final String handshakePeerAddress = (String) handshakeSpan.getTag(Tags.PEER_HOSTNAME);
153+
if (handshakePeerAddress != null) {
154+
wsSpan.setTag(Tags.PEER_HOSTNAME, handshakePeerAddress);
155+
}
156+
if (config.isWebsocketTagSessionId()) {
157+
wsSpan.setTag(WEBSOCKET_SESSION_ID, handlerContext.getSessionId());
158+
}
159+
if (useDedicatedTraces || !traceStarter) {
160+
// the link is not added if the user wants to have receive frames on the same trace as the
161+
// handshake
162+
wsSpan.addLink(
163+
SpanLink.from(handshakeSpan.context(), (byte) 0, "", linkAttributes, inheritSampling));
164+
}
165+
}
166+
return wsSpan;
167+
}
168+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package datadog.trace.bootstrap.instrumentation.websocket;
2+
3+
import datadog.trace.api.time.SystemTimeSource;
4+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
5+
6+
public abstract class HandlerContext {
7+
8+
private final AgentSpan handshakeSpan;
9+
private AgentSpan websocketSpan;
10+
private final String sessionId;
11+
protected long msgChunks = 0;
12+
protected long msgSize = 0;
13+
private final CharSequence wsResourceName;
14+
protected long firstFrameTimestamp;
15+
16+
public HandlerContext(AgentSpan handshakeSpan, String sessionId) {
17+
this.handshakeSpan = handshakeSpan;
18+
this.sessionId = sessionId;
19+
wsResourceName = ResourceNameExtractor.extractResourceName(handshakeSpan.getResourceName());
20+
}
21+
22+
public AgentSpan getHandshakeSpan() {
23+
return handshakeSpan;
24+
}
25+
26+
public AgentSpan getWebsocketSpan() {
27+
return websocketSpan;
28+
}
29+
30+
public void setWebsocketSpan(AgentSpan websocketSpan) {
31+
this.websocketSpan = websocketSpan;
32+
}
33+
34+
public String getSessionId() {
35+
return sessionId;
36+
}
37+
38+
public long getMsgChunks() {
39+
return msgChunks;
40+
}
41+
42+
public long getMsgSize() {
43+
return msgSize;
44+
}
45+
46+
public CharSequence getWsResourceName() {
47+
return wsResourceName;
48+
}
49+
50+
public long getFirstFrameTimestamp() {
51+
return firstFrameTimestamp;
52+
}
53+
54+
public abstract CharSequence getMessageType();
55+
56+
public void reset() {
57+
msgChunks = 0;
58+
websocketSpan = null;
59+
msgSize = 0;
60+
firstFrameTimestamp = 0;
61+
}
62+
63+
public static class Receiver extends HandlerContext {
64+
private boolean msgSizeExtractorInitialized = false;
65+
private HandlersExtractor.SizeCalculator msgSizeCalculator;
66+
67+
public Receiver(AgentSpan handshakeSpan, String sessionId) {
68+
super(handshakeSpan, sessionId);
69+
}
70+
71+
@Override
72+
public CharSequence getMessageType() {
73+
return msgSizeCalculator != null ? msgSizeCalculator.getFormat() : null;
74+
}
75+
76+
public void recordChunkData(Object data, boolean partialDelivery) {
77+
if (msgChunks++ == 0 && partialDelivery) {
78+
firstFrameTimestamp = SystemTimeSource.INSTANCE.getCurrentTimeNanos();
79+
}
80+
if (data == null) {
81+
return;
82+
}
83+
if (!msgSizeExtractorInitialized) {
84+
msgSizeExtractorInitialized = true;
85+
msgSizeCalculator = HandlersExtractor.getSizeCalculator(data);
86+
}
87+
88+
if (msgSizeCalculator != null) {
89+
try {
90+
int sz = msgSizeCalculator.getLengthFunction().applyAsInt(data);
91+
msgSize += sz;
92+
if (partialDelivery && sz == 0) {
93+
msgChunks--; // if we receive an empty frame with the fin bit don't count it as a chunk
94+
}
95+
} catch (Throwable ignored) {
96+
}
97+
}
98+
}
99+
}
100+
101+
public static class Sender extends HandlerContext {
102+
private CharSequence msgType;
103+
104+
public Sender(AgentSpan handshakeSpan, String sessionId) {
105+
super(handshakeSpan, sessionId);
106+
}
107+
108+
@Override
109+
public CharSequence getMessageType() {
110+
return msgType;
111+
}
112+
113+
@Override
114+
public void reset() {
115+
super.reset();
116+
msgType = null;
117+
}
118+
119+
public void recordChunkData(CharSequence type, int size) {
120+
msgChunks++;
121+
if (msgType == null) {
122+
msgType = type;
123+
}
124+
msgSize += size;
125+
}
126+
}
127+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package datadog.trace.bootstrap.instrumentation.websocket;
2+
3+
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
4+
import java.io.InputStream;
5+
import java.io.Reader;
6+
import java.nio.ByteBuffer;
7+
import java.util.function.ToIntFunction;
8+
import javax.annotation.Nonnull;
9+
10+
public class HandlersExtractor {
11+
public static final CharSequence MESSAGE_TYPE_TEXT = UTF8BytesString.create("text");
12+
public static final CharSequence MESSAGE_TYPE_BINARY = UTF8BytesString.create("binary");
13+
14+
static class CharSequenceLenFunction implements ToIntFunction<CharSequence> {
15+
@Override
16+
public int applyAsInt(CharSequence value) {
17+
return value != null ? value.length() : 0;
18+
}
19+
}
20+
21+
static class ByteArrayLenFunction implements ToIntFunction<byte[]> {
22+
@Override
23+
public int applyAsInt(byte[] value) {
24+
return value != null ? value.length : 0;
25+
}
26+
}
27+
28+
static class ByteBufferLenFunction implements ToIntFunction<ByteBuffer> {
29+
@Override
30+
public int applyAsInt(ByteBuffer value) {
31+
return value != null ? value.remaining() : 0;
32+
}
33+
}
34+
35+
static class NoopLenFunction implements ToIntFunction<Object> {
36+
@Override
37+
public int applyAsInt(Object ignored) {
38+
return 0;
39+
}
40+
}
41+
42+
public static class SizeCalculator<T> {
43+
@Nonnull private final ToIntFunction<T> lengthFunction;
44+
@Nonnull private final CharSequence format;
45+
46+
SizeCalculator(@Nonnull ToIntFunction<T> lengthFunction, @Nonnull CharSequence format) {
47+
this.lengthFunction = lengthFunction;
48+
this.format = format;
49+
}
50+
51+
@Nonnull
52+
public ToIntFunction<T> getLengthFunction() {
53+
return lengthFunction;
54+
}
55+
56+
@Nonnull
57+
public CharSequence getFormat() {
58+
return format;
59+
}
60+
}
61+
62+
public static final SizeCalculator<CharSequence> CHAR_SEQUENCE_SIZE_CALCULATOR =
63+
new SizeCalculator<>(new CharSequenceLenFunction(), MESSAGE_TYPE_TEXT);
64+
public static final SizeCalculator<byte[]> BYTES_SIZE_CALCULATOR =
65+
new SizeCalculator<>(new ByteArrayLenFunction(), MESSAGE_TYPE_BINARY);
66+
public static final SizeCalculator<ByteBuffer> BYTE_BUFFER_SIZE_CALCULATOR =
67+
new SizeCalculator<>(new ByteBufferLenFunction(), MESSAGE_TYPE_BINARY);
68+
public static final SizeCalculator<Object> TEXT_STREAM_SIZE_CALCULATOR =
69+
new SizeCalculator<>(new NoopLenFunction(), MESSAGE_TYPE_TEXT);
70+
public static final SizeCalculator<Object> BYTE_STREAM_SIZE_CALCULATOR =
71+
new SizeCalculator<>(new NoopLenFunction(), MESSAGE_TYPE_BINARY);
72+
73+
public static SizeCalculator getSizeCalculator(Object data) {
74+
// we only extract "safely" the message size from byte[], ByteBuffer and String.
75+
// Other types will contain streaming data (i.e. InputStream, Reader)
76+
if (data instanceof CharSequence) {
77+
return CHAR_SEQUENCE_SIZE_CALCULATOR;
78+
} else if (data instanceof byte[]) {
79+
return BYTES_SIZE_CALCULATOR;
80+
} else if (data instanceof ByteBuffer) {
81+
return BYTE_BUFFER_SIZE_CALCULATOR;
82+
} else if (data instanceof Reader) {
83+
return TEXT_STREAM_SIZE_CALCULATOR;
84+
} else if (data instanceof InputStream) {
85+
return BYTE_STREAM_SIZE_CALCULATOR;
86+
}
87+
return null;
88+
}
89+
90+
private HandlersExtractor() {}
91+
}

0 commit comments

Comments
 (0)