Skip to content

Commit 833e811

Browse files
authored
Merge pull request #1 from DataDog/master
from original
2 parents a13e4a0 + 65c6b1f commit 833e811

File tree

11 files changed

+288
-97
lines changed

11 files changed

+288
-97
lines changed

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java

+22-6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import datadog.trace.agent.tooling.Instrumenter;
1313
import java.util.HashMap;
1414
import java.util.Iterator;
15+
import java.util.List;
1516
import java.util.Map;
1617
import net.bytebuddy.asm.Advice;
1718
import net.bytebuddy.description.method.MethodDescription;
@@ -41,7 +42,8 @@ public String[] helperClassNames() {
4142
packageName + ".KafkaDecorator$2",
4243
packageName + ".TextMapExtractAdapter",
4344
packageName + ".TracingIterable",
44-
packageName + ".TracingIterable$TracingIterator",
45+
packageName + ".TracingIterator",
46+
packageName + ".TracingList",
4547
};
4648
}
4749

@@ -52,11 +54,16 @@ public Map<? extends ElementMatcher<? super MethodDescription>, String> transfor
5254
isMethod()
5355
.and(isPublic())
5456
.and(named("records"))
55-
.and(
56-
takesArgument(0, String.class)
57-
.or(takesArgument(0, named("org.apache.kafka.common.TopicPartition"))))
57+
.and(takesArgument(0, String.class))
5858
.and(returns(Iterable.class)),
5959
IterableAdvice.class.getName());
60+
transformers.put(
61+
isMethod()
62+
.and(isPublic())
63+
.and(named("records"))
64+
.and(takesArgument(0, named("org.apache.kafka.common.TopicPartition")))
65+
.and(returns(List.class)),
66+
ListAdvice.class.getName());
6067
transformers.put(
6168
isMethod()
6269
.and(isPublic())
@@ -77,13 +84,22 @@ public static void wrap(@Advice.Return(readOnly = false) Iterable<ConsumerRecord
7784
}
7885
}
7986

87+
public static class ListAdvice {
88+
89+
@Advice.OnMethodExit(suppress = Throwable.class)
90+
public static void wrap(@Advice.Return(readOnly = false) List<ConsumerRecord> iterable) {
91+
if (iterable != null) {
92+
iterable = new TracingList(iterable, "kafka.consume", CONSUMER_DECORATE);
93+
}
94+
}
95+
}
96+
8097
public static class IteratorAdvice {
8198

8299
@Advice.OnMethodExit(suppress = Throwable.class)
83100
public static void wrap(@Advice.Return(readOnly = false) Iterator<ConsumerRecord> iterator) {
84101
if (iterator != null) {
85-
iterator =
86-
new TracingIterable.TracingIterator(iterator, "kafka.consume", CONSUMER_DECORATE);
102+
iterator = new TracingIterator(iterator, "kafka.consume", CONSUMER_DECORATE);
87103
}
88104
}
89105
}
Original file line numberDiff line numberDiff line change
@@ -1,91 +1,24 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

3-
import io.opentracing.Scope;
4-
import io.opentracing.SpanContext;
5-
import io.opentracing.propagation.Format;
6-
import io.opentracing.util.GlobalTracer;
73
import java.util.Iterator;
8-
import lombok.extern.slf4j.Slf4j;
94
import org.apache.kafka.clients.consumer.ConsumerRecord;
105

116
public class TracingIterable implements Iterable<ConsumerRecord> {
12-
private final Iterable<ConsumerRecord> delegateIterable;
7+
private final Iterable<ConsumerRecord> delegate;
138
private final String operationName;
149
private final KafkaDecorator decorator;
1510

1611
public TracingIterable(
17-
final Iterable<ConsumerRecord> delegateIterable,
12+
final Iterable<ConsumerRecord> delegate,
1813
final String operationName,
1914
final KafkaDecorator decorator) {
20-
this.delegateIterable = delegateIterable;
15+
this.delegate = delegate;
2116
this.operationName = operationName;
2217
this.decorator = decorator;
2318
}
2419

2520
@Override
2621
public Iterator<ConsumerRecord> iterator() {
27-
return new TracingIterator(delegateIterable.iterator(), operationName, decorator);
28-
}
29-
30-
@Slf4j
31-
public static class TracingIterator implements Iterator<ConsumerRecord> {
32-
private final Iterator<ConsumerRecord> delegateIterator;
33-
private final String operationName;
34-
private final KafkaDecorator decorator;
35-
36-
/**
37-
* Note: this may potentially create problems if this iterator is used from different threads.
38-
* But at the moment we cannot do much about this.
39-
*/
40-
private Scope currentScope;
41-
42-
public TracingIterator(
43-
final Iterator<ConsumerRecord> delegateIterator,
44-
final String operationName,
45-
final KafkaDecorator decorator) {
46-
this.delegateIterator = delegateIterator;
47-
this.operationName = operationName;
48-
this.decorator = decorator;
49-
}
50-
51-
@Override
52-
public boolean hasNext() {
53-
if (currentScope != null) {
54-
currentScope.close();
55-
currentScope = null;
56-
}
57-
return delegateIterator.hasNext();
58-
}
59-
60-
@Override
61-
public ConsumerRecord next() {
62-
if (currentScope != null) {
63-
// in case they didn't call hasNext()...
64-
currentScope.close();
65-
currentScope = null;
66-
}
67-
68-
final ConsumerRecord next = delegateIterator.next();
69-
70-
try {
71-
if (next != null) {
72-
final SpanContext spanContext =
73-
GlobalTracer.get()
74-
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(next.headers()));
75-
currentScope =
76-
GlobalTracer.get().buildSpan(operationName).asChildOf(spanContext).startActive(true);
77-
decorator.afterStart(currentScope);
78-
decorator.onConsume(currentScope, next);
79-
}
80-
} catch (final Exception e) {
81-
log.debug("Error during decoration", e);
82-
}
83-
return next;
84-
}
85-
86-
@Override
87-
public void remove() {
88-
delegateIterator.remove();
89-
}
22+
return new TracingIterator(delegate.iterator(), operationName, decorator);
9023
}
9124
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package datadog.trace.instrumentation.kafka_clients;
2+
3+
import io.opentracing.Scope;
4+
import io.opentracing.SpanContext;
5+
import io.opentracing.propagation.Format;
6+
import io.opentracing.util.GlobalTracer;
7+
import java.util.Iterator;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.apache.kafka.clients.consumer.ConsumerRecord;
10+
11+
@Slf4j
12+
public class TracingIterator implements Iterator<ConsumerRecord> {
13+
private final Iterator<ConsumerRecord> delegateIterator;
14+
private final String operationName;
15+
private final KafkaDecorator decorator;
16+
17+
/**
18+
* Note: this may potentially create problems if this iterator is used from different threads. But
19+
* at the moment we cannot do much about this.
20+
*/
21+
private Scope currentScope;
22+
23+
public TracingIterator(
24+
final Iterator<ConsumerRecord> delegateIterator,
25+
final String operationName,
26+
final KafkaDecorator decorator) {
27+
this.delegateIterator = delegateIterator;
28+
this.operationName = operationName;
29+
this.decorator = decorator;
30+
}
31+
32+
@Override
33+
public boolean hasNext() {
34+
if (currentScope != null) {
35+
currentScope.close();
36+
currentScope = null;
37+
}
38+
return delegateIterator.hasNext();
39+
}
40+
41+
@Override
42+
public ConsumerRecord next() {
43+
if (currentScope != null) {
44+
// in case they didn't call hasNext()...
45+
currentScope.close();
46+
currentScope = null;
47+
}
48+
49+
final ConsumerRecord next = delegateIterator.next();
50+
51+
try {
52+
if (next != null) {
53+
final SpanContext spanContext =
54+
GlobalTracer.get()
55+
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(next.headers()));
56+
currentScope =
57+
GlobalTracer.get().buildSpan(operationName).asChildOf(spanContext).startActive(true);
58+
decorator.afterStart(currentScope);
59+
decorator.onConsume(currentScope, next);
60+
}
61+
} catch (final Exception e) {
62+
log.debug("Error during decoration", e);
63+
}
64+
return next;
65+
}
66+
67+
@Override
68+
public void remove() {
69+
delegateIterator.remove();
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package datadog.trace.instrumentation.kafka_clients;
2+
3+
import java.util.Collection;
4+
import java.util.Iterator;
5+
import java.util.List;
6+
import java.util.ListIterator;
7+
import org.apache.kafka.clients.consumer.ConsumerRecord;
8+
9+
public class TracingList implements List<ConsumerRecord> {
10+
private final List<ConsumerRecord> delegate;
11+
private final String operationName;
12+
private final KafkaDecorator decorator;
13+
14+
public TracingList(
15+
final List<ConsumerRecord> delegate,
16+
final String operationName,
17+
final KafkaDecorator decorator) {
18+
this.delegate = delegate;
19+
this.operationName = operationName;
20+
this.decorator = decorator;
21+
}
22+
23+
@Override
24+
public int size() {
25+
return delegate.size();
26+
}
27+
28+
@Override
29+
public boolean isEmpty() {
30+
return delegate.isEmpty();
31+
}
32+
33+
@Override
34+
public boolean contains(final Object o) {
35+
return delegate.contains(o);
36+
}
37+
38+
@Override
39+
public Iterator<ConsumerRecord> iterator() {
40+
return new TracingIterator(delegate.iterator(), operationName, decorator);
41+
}
42+
43+
@Override
44+
public Object[] toArray() {
45+
return delegate.toArray();
46+
}
47+
48+
@Override
49+
public <T> T[] toArray(final T[] a) {
50+
return delegate.toArray(a);
51+
}
52+
53+
@Override
54+
public boolean add(final ConsumerRecord consumerRecord) {
55+
return delegate.add(consumerRecord);
56+
}
57+
58+
@Override
59+
public boolean remove(final Object o) {
60+
return delegate.remove(o);
61+
}
62+
63+
@Override
64+
public boolean containsAll(final Collection<?> c) {
65+
return delegate.containsAll(c);
66+
}
67+
68+
@Override
69+
public boolean addAll(final Collection<? extends ConsumerRecord> c) {
70+
return delegate.addAll(c);
71+
}
72+
73+
@Override
74+
public boolean addAll(final int index, final Collection<? extends ConsumerRecord> c) {
75+
return delegate.addAll(index, c);
76+
}
77+
78+
@Override
79+
public boolean removeAll(final Collection<?> c) {
80+
return delegate.removeAll(c);
81+
}
82+
83+
@Override
84+
public boolean retainAll(final Collection<?> c) {
85+
return delegate.retainAll(c);
86+
}
87+
88+
@Override
89+
public void clear() {
90+
delegate.clear();
91+
}
92+
93+
@Override
94+
public ConsumerRecord get(final int index) {
95+
// TODO: should this be instrumented as well?
96+
return delegate.get(index);
97+
}
98+
99+
@Override
100+
public ConsumerRecord set(final int index, final ConsumerRecord element) {
101+
return delegate.set(index, element);
102+
}
103+
104+
@Override
105+
public void add(final int index, final ConsumerRecord element) {
106+
delegate.add(index, element);
107+
}
108+
109+
@Override
110+
public ConsumerRecord remove(final int index) {
111+
return delegate.remove(index);
112+
}
113+
114+
@Override
115+
public int indexOf(final Object o) {
116+
return delegate.indexOf(o);
117+
}
118+
119+
@Override
120+
public int lastIndexOf(final Object o) {
121+
return delegate.lastIndexOf(o);
122+
}
123+
124+
@Override
125+
public ListIterator<ConsumerRecord> listIterator() {
126+
// TODO: the API for ListIterator is not really good to instrument it in context of Kafka
127+
// Consumer so we will not do that for now
128+
return delegate.listIterator();
129+
}
130+
131+
@Override
132+
public ListIterator<ConsumerRecord> listIterator(final int index) {
133+
// TODO: the API for ListIterator is not really good to instrument it in context of Kafka
134+
// Consumer so we will not do that for now
135+
return delegate.listIterator(index);
136+
}
137+
138+
@Override
139+
public List<ConsumerRecord> subList(final int fromIndex, final int toIndex) {
140+
return new TracingList(delegate.subList(fromIndex, toIndex), operationName, decorator);
141+
}
142+
}

dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@
2424

2525
public class KafkaStreamsProcessorInstrumentation {
2626
// These two instrumentations work together to apply StreamTask.process.
27-
// The combination of these are needed because there's not a good instrumentation point.
27+
// The combination of these is needed because there's no good instrumentation point.
28+
// FIXME: this instrumentation takes somewhat strange approach. It looks like Kafka Streams
29+
// defines notions of 'processor', 'source' and 'sink'. There is no 'consumer' as such.
30+
// Also this instrumentation doesn't define 'producer' making it 'asymmetric' - resulting
31+
// in awkward tests and traces. We may want to revisit this in the future.
2832

2933
@AutoService(Instrumenter.class)
3034
public static class StartInstrumentation extends Instrumenter.Default {

0 commit comments

Comments
 (0)