Skip to content

Instrument Java Websocket API (JSR356) #8440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 10, 2025
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package datadog.trace.bootstrap.instrumentation.decorator;

import static datadog.trace.api.DDTags.DECISION_MAKER_INHERITED;
import static datadog.trace.api.DDTags.DECISION_MAKER_RESOURCE;
import static datadog.trace.api.DDTags.DECISION_MAKER_SERVICE;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_CLOSE_CODE;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_CLOSE_REASON;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_FRAMES;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_LENGTH;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_RECEIVE_TIME;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_MESSAGE_TYPE;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.WEBSOCKET_SESSION_ID;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER;

import datadog.trace.api.Config;
import datadog.trace.api.time.SystemTimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.bootstrap.instrumentation.api.NotSampledSpanContext;
import datadog.trace.bootstrap.instrumentation.api.SpanAttributes;
import datadog.trace.bootstrap.instrumentation.api.SpanLink;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
import javax.annotation.Nonnull;

public class WebsocketDecorator extends BaseDecorator {
private static final CharSequence WEBSOCKET = UTF8BytesString.create("websocket");
private static final String[] INSTRUMENTATION_NAMES = {WEBSOCKET.toString()};
private static final CharSequence WEBSOCKET_RECEIVE = UTF8BytesString.create("websocket.receive");
private static final CharSequence WEBSOCKET_SEND = UTF8BytesString.create("websocket.send");
private static final CharSequence WEBSOCKET_CLOSE = UTF8BytesString.create("websocket.close");

private static final SpanAttributes SPAN_ATTRIBUTES_RECEIVE =
SpanAttributes.builder().put("dd.kind", "executed_from").build();
private static final SpanAttributes SPAN_ATTRIBUTES_SEND =
SpanAttributes.builder().put("dd.kind", "resuming").build();

public static final WebsocketDecorator DECORATE = new WebsocketDecorator();

@Override
protected String[] instrumentationNames() {
return INSTRUMENTATION_NAMES;
}

@Override
protected CharSequence spanType() {
return InternalSpanTypes.WEBSOCKET;
}

@Override
protected CharSequence component() {
return WEBSOCKET;
}

@Override
public AgentSpan afterStart(AgentSpan span) {
return super.afterStart(span).setMeasured(true);
}

@Nonnull
public AgentSpan onReceiveFrameStart(
final HandlerContext.Receiver handlerContext, final Object data, boolean partialDelivery) {
handlerContext.recordChunkData(data, partialDelivery);
return onFrameStart(
WEBSOCKET_RECEIVE, SPAN_KIND_CONSUMER, handlerContext, SPAN_ATTRIBUTES_RECEIVE, true);
}

@Nonnull
public AgentSpan onSessionCloseIssued(
final HandlerContext.Sender handlerContext, CharSequence closeReason, int closeCode) {
return onFrameStart(
WEBSOCKET_CLOSE, SPAN_KIND_PRODUCER, handlerContext, SPAN_ATTRIBUTES_SEND, false)
.setTag(WEBSOCKET_CLOSE_CODE, closeCode)
.setTag(WEBSOCKET_CLOSE_REASON, closeReason);
}

@Nonnull
public AgentSpan onSessionCloseReceived(
final HandlerContext.Receiver handlerContext, CharSequence closeReason, int closeCode) {
return onFrameStart(
WEBSOCKET_CLOSE, SPAN_KIND_CONSUMER, handlerContext, SPAN_ATTRIBUTES_RECEIVE, true)
.setTag(WEBSOCKET_CLOSE_CODE, closeCode)
.setTag(WEBSOCKET_CLOSE_REASON, closeReason);
}

@Nonnull
public AgentSpan onSendFrameStart(
final HandlerContext.Sender handlerContext, final CharSequence msgType, final int msgSize) {
handlerContext.recordChunkData(msgType, msgSize);
return onFrameStart(
WEBSOCKET_SEND, SPAN_KIND_PRODUCER, handlerContext, SPAN_ATTRIBUTES_SEND, false);
}

public void onFrameEnd(final HandlerContext handlerContext) {
if (handlerContext == null) {
return;
}
final AgentSpan wsSpan = handlerContext.getWebsocketSpan();
if (wsSpan == null) {
return;
}
try {
final long startTime = handlerContext.getFirstFrameTimestamp();
if (startTime > 0) {
wsSpan.setTag(
WEBSOCKET_MESSAGE_RECEIVE_TIME,
SystemTimeSource.INSTANCE.getCurrentTimeNanos() - startTime);
}
final long chunks = handlerContext.getMsgChunks();
if (chunks > 0) {
wsSpan.setTag(WEBSOCKET_MESSAGE_FRAMES, chunks);
wsSpan.setTag(WEBSOCKET_MESSAGE_LENGTH, handlerContext.getMsgSize());
wsSpan.setTag(WEBSOCKET_MESSAGE_TYPE, handlerContext.getMessageType());
}
(beforeFinish(wsSpan)).finish();
} finally {
handlerContext.reset();
}
}

private AgentSpan onFrameStart(
final CharSequence operationName,
final CharSequence spanKind,
final HandlerContext handlerContext,
final SpanAttributes linkAttributes,
boolean traceStarter) {
AgentSpan wsSpan = handlerContext.getWebsocketSpan();
if (wsSpan == null) {
final Config config = Config.get();
final AgentSpan handshakeSpan = handlerContext.getHandshakeSpan();
boolean inheritSampling = config.isWebsocketMessagesInheritSampling();
boolean useDedicatedTraces = config.isWebsocketMessagesSeparateTraces();
if (traceStarter) {
if (useDedicatedTraces) {
wsSpan = startSpan(WEBSOCKET.toString(), operationName, null);
if (inheritSampling) {
wsSpan.setTag(DECISION_MAKER_INHERITED, 1);
wsSpan.setTag(DECISION_MAKER_SERVICE, handshakeSpan.getServiceName());
wsSpan.setTag(DECISION_MAKER_RESOURCE, handshakeSpan.getResourceName());
}
} else {
wsSpan = startSpan(WEBSOCKET.toString(), operationName, handshakeSpan.context());
}
} else {
wsSpan = startSpan(WEBSOCKET.toString(), operationName);
}
handlerContext.setWebsocketSpan(wsSpan);
afterStart(wsSpan);
wsSpan.setTag(SPAN_KIND, spanKind);
wsSpan.setResourceName(handlerContext.getWsResourceName());
// carry over peer information for inferred services
final String handshakePeerAddress = (String) handshakeSpan.getTag(Tags.PEER_HOSTNAME);
if (handshakePeerAddress != null) {
wsSpan.setTag(Tags.PEER_HOSTNAME, handshakePeerAddress);
}
if (config.isWebsocketTagSessionId()) {
wsSpan.setTag(WEBSOCKET_SESSION_ID, handlerContext.getSessionId());
}
if (useDedicatedTraces || !traceStarter) {
// the link is not added if the user wants to have receive frames on the same trace as the
// handshake
wsSpan.addLink(
SpanLink.from(
inheritSampling
? handshakeSpan.context()
: new NotSampledSpanContext(handshakeSpan.context()),
SpanLink.DEFAULT_FLAGS,
"",
linkAttributes));
}
}
return wsSpan;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package datadog.trace.bootstrap.instrumentation.websocket;

import datadog.trace.api.time.SystemTimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HandlerContext {
final Logger LOGGER = LoggerFactory.getLogger(HandlerContext.class);

private final AgentSpan handshakeSpan;
private AgentSpan websocketSpan;
private final String sessionId;
protected long msgChunks = 0;
protected long msgSize = 0;
private final CharSequence wsResourceName;
protected long firstFrameTimestamp;

public HandlerContext(AgentSpan handshakeSpan, String sessionId) {
this.handshakeSpan = handshakeSpan;
this.sessionId = sessionId;
wsResourceName = ResourceNameExtractor.extractResourceName(handshakeSpan.getResourceName());
}

public AgentSpan getHandshakeSpan() {
return handshakeSpan;
}

public AgentSpan getWebsocketSpan() {
return websocketSpan;
}

public void setWebsocketSpan(AgentSpan websocketSpan) {
this.websocketSpan = websocketSpan;
}

public String getSessionId() {
return sessionId;
}

public long getMsgChunks() {
return msgChunks;
}

public long getMsgSize() {
return msgSize;
}

public CharSequence getWsResourceName() {
return wsResourceName;
}

public long getFirstFrameTimestamp() {
return firstFrameTimestamp;
}

public abstract CharSequence getMessageType();

public void reset() {
msgChunks = 0;
websocketSpan = null;
msgSize = 0;
firstFrameTimestamp = 0;
}

public static class Receiver extends HandlerContext {
private boolean msgSizeExtractorInitialized = false;
private HandlersExtractor.SizeCalculator msgSizeCalculator;

public Receiver(AgentSpan handshakeSpan, String sessionId) {
super(handshakeSpan, sessionId);
}

@Override
public CharSequence getMessageType() {
return msgSizeCalculator != null ? msgSizeCalculator.getFormat() : null;
}

public void recordChunkData(Object data, boolean partialDelivery) {
if (msgChunks++ == 0 && partialDelivery) {
firstFrameTimestamp = SystemTimeSource.INSTANCE.getCurrentTimeNanos();
}
if (data == null) {
return;
}
if (!msgSizeExtractorInitialized) {
msgSizeExtractorInitialized = true;
msgSizeCalculator = HandlersExtractor.getSizeCalculator(data);
}

if (msgSizeCalculator != null) {
try {
int sz = msgSizeCalculator.getLengthFunction().applyAsInt(data);
msgSize += sz;
if (partialDelivery && sz == 0) {
msgChunks--; // if we receive an empty frame with the fin bit don't count it as a chunk
}
} catch (Throwable t) {
LOGGER.debug(
"Unable to calculate websocket message size for data type {}",
data.getClass().getName(),
t);
}
}
}
}

public static class Sender extends HandlerContext {
private CharSequence msgType;

public Sender(AgentSpan handshakeSpan, String sessionId) {
super(handshakeSpan, sessionId);
}

@Override
public CharSequence getMessageType() {
return msgType;
}

@Override
public void reset() {
super.reset();
msgType = null;
}

public void recordChunkData(CharSequence type, int size) {
msgChunks++;
if (msgType == null) {
msgType = type;
}
msgSize += size;
}
}
}
Loading