Skip to content

opentelemetry-instrumentation-kafka-python: wait for metadata #1260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c5a026f
fix kafka: wait for metadata
rayrapetyan Sep 1, 2022
2733cd8
upd: changelog
rayrapetyan Sep 1, 2022
e60c6a4
fix: changelog
rayrapetyan Sep 1, 2022
bfd272c
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Sep 13, 2022
6bef7c4
fix: import KafkaErrors
rayrapetyan Sep 14, 2022
4cf76da
Merge branch 'main' into fix_kafka_wait_metadata
rayrapetyan Sep 14, 2022
38b9bbf
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Sep 15, 2022
37b772d
fix: tox -e lint errors
rayrapetyan Sep 15, 2022
39b38bb
Merge branch 'fix_kafka_wait_metadata' of gb.xjqchip.workers.dev-rayrapetyan:rayr…
rayrapetyan Sep 15, 2022
79a7c63
Merge branch 'open-telemetry:main' into fix_kafka_wait_metadata
rayrapetyan Sep 15, 2022
df3fe62
Merge branch 'main' into fix_kafka_wait_metadata
lzchen Sep 20, 2022
23b117e
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Sep 20, 2022
9f442ec
Merge branch 'main' into fix_kafka_wait_metadata
rayrapetyan Sep 25, 2022
1a149c4
fix: refact and added unit test
rayrapetyan Sep 25, 2022
02f8b7a
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Oct 1, 2022
6f69e94
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Oct 5, 2022
42bbdf6
Merge branch 'main' into fix_kafka_wait_metadata
rayrapetyan Oct 10, 2022
25b4fb6
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Oct 10, 2022
2575b24
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Oct 11, 2022
bd08811
Merge branch 'main' into fix_kafka_wait_metadata
ocelotl Nov 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- `opentelemetry-instrumentation-kafka-python`: wait for metadata
([#1260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1260))
- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies.
([#1234](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1234))
- `opentelemetry-instrumentation-pymongo` Change span names to not contain queries but only database name and command name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from logging import getLogger
from typing import Callable, Dict, List, Optional

import kafka.errors as KafkaErrors
from kafka.record.abc import ABCRecord

from opentelemetry import context, propagate, trace
Expand Down Expand Up @@ -146,6 +147,12 @@ def _traced_send(func, instance, args, kwargs):
kwargs["headers"] = headers

topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs)
try:
instance._wait_on_metadata(
topic, instance.config["max_block_ms"] / 1000.0
)
except KafkaErrors.BrokerResponseError as kafka_exception:
_LOG.exception(kafka_exception)
bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(
instance
)
Expand Down