Skip to content

Commit 9cea60a

Browse files
committed
fix(instrumentation-kafkajs): fix instr to work with [email protected] and earlier
The tests broke on [email protected] and earlier. The instrumentation crashed on [email protected] and earlier. Refs: open-telemetry#2784 (comment) Fixes: open-telemetry#2784
1 parent b6d6d94 commit 9cea60a

File tree

3 files changed

+13
-20
lines changed

3 files changed

+13
-20
lines changed

plugins/node/instrumentation-kafkajs/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ This package uses `@opentelemetry/semantic-conventions` version `1.30+`, which i
6969
| --------------------- | ------------------------------------- | ------------------------------------------------------------ |
7070
| Consumer | `messaging.process.duration` | Duration of processing operation. [1] |
7171
| Consumer | `messaging.client.consumed.messages` | Number of messages that were delivered to the application. |
72-
| Consumer and Producer | `messaging.client.operation.duration` | Number of messages that were delivered to the application. |
72+
| Consumer and Producer | `messaging.client.operation.duration` | Number of messages that were delivered to the application. (Only emitted for [email protected] and later.) |
7373
| Producer | `messaging.client.sent.messages` | Number of messages producer attempted to send to the broker. |
7474

7575
**[1] `messaging.process.duration`:** In the context of `eachBatch`, this metric will be emitted once for each message but the value reflects the duration of the entire batch.

plugins/node/instrumentation-kafkajs/src/instrumentation.ts

+7-4
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,13 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
251251
private _setKafkaEventListeners(kafkaObj: KafkaEventEmitter) {
252252
if (kafkaObj[EVENT_LISTENERS_SET]) return;
253253

254-
kafkaObj.on(
255-
kafkaObj.events.REQUEST,
256-
this._recordClientDurationMetric.bind(this)
257-
);
254+
// The REQUEST Consumer event was added in [email protected].
255+
if (kafkaObj.events?.REQUEST) {
256+
kafkaObj.on(
257+
kafkaObj.events.REQUEST,
258+
this._recordClientDurationMetric.bind(this)
259+
);
260+
}
258261

259262
kafkaObj[EVENT_LISTENERS_SET] = true;
260263
}

plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts

+5-15
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,7 @@ describe('instrumentation-kafkajs', () => {
220220
);
221221
instrumentation.disable();
222222
instrumentation.enable();
223-
producer = kafka.producer({
224-
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
225-
});
223+
producer = kafka.producer();
226224
}
227225
beforeEach(() => {
228226
initializeProducer();
@@ -479,9 +477,7 @@ describe('instrumentation-kafkajs', () => {
479477
});
480478
instrumentation.disable();
481479
instrumentation.enable();
482-
producer = kafka.producer({
483-
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
484-
});
480+
producer = kafka.producer();
485481
});
486482

487483
it('error in send create failed span', async () => {
@@ -634,9 +630,7 @@ describe('instrumentation-kafkajs', () => {
634630
instrumentation.disable();
635631
instrumentation.setConfig(config);
636632
instrumentation.enable();
637-
producer = kafka.producer({
638-
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
639-
});
633+
producer = kafka.producer();
640634
});
641635

642636
it('producer hook add span attribute with value from message', async () => {
@@ -671,9 +665,7 @@ describe('instrumentation-kafkajs', () => {
671665
instrumentation.disable();
672666
instrumentation.setConfig(config);
673667
instrumentation.enable();
674-
producer = kafka.producer({
675-
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
676-
});
668+
producer = kafka.producer();
677669
});
678670

679671
it('producer hook add span attribute with value from message', async () => {
@@ -1227,9 +1219,7 @@ describe('instrumentation-kafkajs', () => {
12271219
storeRunConfig();
12281220
instrumentation.disable();
12291221
instrumentation.enable();
1230-
producer = kafka.producer({
1231-
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
1232-
});
1222+
producer = kafka.producer();
12331223
consumer = kafka.consumer({ groupId: 'testing-group-id' });
12341224
});
12351225

0 commit comments

Comments
 (0)