1
1
package datadog .trace .core .datastreams ;
2
2
3
+ import static datadog .trace .api .DDTags .PATHWAY_HASH ;
4
+ import static datadog .trace .bootstrap .instrumentation .api .PathwayContext .PROPAGATION_KEY_BASE64 ;
5
+
3
6
import datadog .context .Context ;
4
7
import datadog .context .propagation .CarrierSetter ;
5
8
import datadog .context .propagation .CarrierVisitor ;
10
13
import datadog .trace .bootstrap .instrumentation .api .AgentSpanContext ;
11
14
import datadog .trace .bootstrap .instrumentation .api .PathwayContext ;
12
15
import datadog .trace .bootstrap .instrumentation .api .TagContext ;
16
+ import java .io .IOException ;
13
17
import java .util .function .Supplier ;
14
18
import javax .annotation .Nullable ;
15
19
import javax .annotation .ParametersAreNonnullByDefault ;
16
20
17
21
// TODO Javadoc
18
22
@ ParametersAreNonnullByDefault
19
23
public class DataStreamPropagator implements Propagator {
24
+ private final DataStreamsMonitoring dataStreamsMonitoring ;
20
25
private final Supplier <TraceConfig > traceConfigSupplier ;
21
26
private final TimeSource timeSource ;
22
27
private final long hashOfKnownTags ;
23
28
private final String serviceNameOverride ;
24
29
25
30
public DataStreamPropagator (
31
+ DataStreamsMonitoring dataStreamsMonitoring ,
26
32
Supplier <TraceConfig > traceConfigSupplier ,
27
33
TimeSource timeSource ,
28
34
long hashOfKnownTags ,
29
35
String serviceNameOverride ) {
36
+ this .dataStreamsMonitoring = dataStreamsMonitoring ;
30
37
this .traceConfigSupplier = traceConfigSupplier ;
31
38
this .timeSource = timeSource ;
32
39
this .hashOfKnownTags = hashOfKnownTags ;
@@ -35,12 +42,48 @@ public DataStreamPropagator(
35
42
36
43
@ Override
37
44
public <C > void inject (Context context , C carrier , CarrierSetter <C > setter ) {
38
- // TODO Still in CorePropagation, not migrated yet
45
+ // TODO Pathway context needs to be stored into its own context element instead of span context
46
+ AgentSpan span = AgentSpan .fromContext (context );
47
+ DataStreamContext dsmContext = DataStreamContext .fromContext (context );
48
+ PathwayContext pathwayContext ;
49
+ if (span == null
50
+ || (pathwayContext = span .context ().getPathwayContext ()) == null
51
+ || dsmContext == null ) {
52
+ return ;
53
+ }
54
+
55
+ // TODO Allow set checkpoint to use DsmContext as parameter?
56
+ pathwayContext .setCheckpoint (
57
+ dsmContext .sortedTags ,
58
+ dsmContext .sendCheckpoint ? dataStreamsMonitoring ::add : pathwayContext ::saveStats ,
59
+ dsmContext .defaultTimestamp ,
60
+ dsmContext .payloadSizeBytes );
61
+
62
+ boolean injected = injectPathwayContext (pathwayContext , carrier , setter );
63
+
64
+ if (injected && pathwayContext .getHash () != 0 ) {
65
+ span .setTag (PATHWAY_HASH , Long .toUnsignedString (pathwayContext .getHash ()));
66
+ }
67
+ }
68
+
69
+ private <C > boolean injectPathwayContext (
70
+ PathwayContext pathwayContext , C carrier , CarrierSetter <C > setter ) {
71
+ try {
72
+ String encodedContext = pathwayContext .encode ();
73
+ if (encodedContext != null ) {
74
+ // LOGGER.debug("Injecting pathway context {}", pathwayContext);
75
+ setter .set (carrier , PROPAGATION_KEY_BASE64 , encodedContext );
76
+ return true ;
77
+ }
78
+ } catch (IOException e ) {
79
+ // LOGGER.debug("Unable to set encode pathway context", e);
80
+ }
81
+ return false ;
39
82
}
40
83
41
84
@ Override
42
85
public <C > Context extract (Context context , C carrier , CarrierVisitor <C > visitor ) {
43
- // TODO Pathway context needs to be stored into its own context element
86
+ // TODO Pathway context needs to be stored into its own context element instead of span context
44
87
// Get span context to store pathway context into
45
88
TagContext spanContext = getSpanContextOrNull (context );
46
89
PathwayContext pathwayContext ;
0 commit comments