Skip to content

Commit dc1cc5e

Browse files
authored
feat: add eventbridge integration (#4)
1 parent 6888864 commit dc1cc5e

File tree

2 files changed

+63
-2
lines changed
  • instrumentation
    • opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda
    • opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore

2 files changed

+63
-2
lines changed

instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py

+43
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,31 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
495495
except Exception as ex:
496496
pass
497497

498+
eventBridgeTriggerSpan = None
499+
try:
500+
if type(lambda_event) is dict and lambda_event.get("source") is not None and type(lambda_event.get("source")) is str:
501+
span_name = 'EventBridge event'
502+
if lambda_event.get("detail-type") is not None:
503+
span_name = lambda_event.get("detail-type")
504+
505+
links = []
506+
if lambda_event.get("detail") is not None and lambda_event["detail"].get("_context") is not None:
507+
ctx = get_global_textmap().extract(carrier=lambda_event["detail"].get("_context"))
508+
links.append(Link(get_current_span(ctx).get_span_context()))
509+
510+
eventBridgeTriggerSpan = tracer.start_span(span_name, context=parent_context, kind=SpanKind.CONSUMER, links=links)
511+
eventBridgeTriggerSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
512+
eventBridgeTriggerSpan.set_attribute("faas.trigger.type", "EventBridge")
513+
eventBridgeTriggerSpan.set_attribute("aws.event.bridge.trigger.source", lambda_event.get("source"))
514+
parent_context = set_span_in_context(eventBridgeTriggerSpan)
515+
516+
eventBridgeTriggerSpan.set_attribute(
517+
"rpc.request.body",
518+
json.dumps(lambda_event),
519+
)
520+
except Exception as ex:
521+
pass
522+
498523
try:
499524
with tracer.start_as_current_span(
500525
name=orig_handler_name,
@@ -651,6 +676,22 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
651676
except Exception:
652677
pass
653678
cognitoTriggerSpan.end()
679+
680+
if lambda_event and eventBridgeTriggerSpan is not None:
681+
try:
682+
if isinstance(result, dict) and result.get("statusCode"):
683+
eventBridgeTriggerSpan.set_attribute(
684+
SpanAttributes.HTTP_STATUS_CODE,
685+
result.get("statusCode"),
686+
)
687+
if isinstance(result, dict) and result.get("body"):
688+
eventBridgeTriggerSpan.set_attribute(
689+
"rpc.response.body",
690+
result.get("body"),
691+
)
692+
except Exception:
693+
pass
694+
eventBridgeTriggerSpan.end()
654695

655696
now = time.time()
656697
_tracer_provider = tracer_provider or get_tracer_provider()
@@ -678,6 +719,8 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
678719
dynamoTriggerSpan.end()
679720
if cognitoTriggerSpan is not None:
680721
cognitoTriggerSpan.end()
722+
if eventBridgeTriggerSpan is not None:
723+
eventBridgeTriggerSpan.end()
681724

682725
now = time.time()
683726
_tracer_provider = tracer_provider or get_tracer_provider()

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py

+20-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def response_hook(span, service_name, operation_name, result):
108108
from opentelemetry.propagate import inject
109109
from opentelemetry.propagators import textmap
110110
from opentelemetry.semconv.trace import SpanAttributes
111-
from opentelemetry.trace import get_tracer
111+
from opentelemetry.trace import get_tracer, SpanKind
112112
from opentelemetry.trace.span import Span
113113
import base64
114114
import typing
@@ -218,6 +218,8 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
218218
body = call_context.params.get("Message")
219219
if body is not None:
220220
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit,json.dumps(body, default=str))
221+
elif call_context.service == "events" and call_context.operation == "PutEvents":
222+
call_context.span_kind = SpanKind.PRODUCER
221223
else:
222224
attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(call_context.params, default=str))
223225
except Exception as ex:
@@ -274,7 +276,23 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
274276
inject(carrier = entry.get("MessageAttributes"), setter=SQSSetter())
275277

276278
except Exception as ex:
277-
pass
279+
pass
280+
281+
try:
282+
if call_context.service == "events" and call_context.operation == "PutEvents":
283+
if args[1].get("Entries") is not None:
284+
for entry in args[1].get("Entries"):
285+
if entry.get("Detail") is not None:
286+
detailJson = json.loads(entry.get("Detail"))
287+
detailJson['_context'] = {}
288+
inject(carrier = detailJson['_context'])
289+
entry['Detail'] = json.dumps(detailJson)
290+
else:
291+
detailJson = {'_context': {}}
292+
inject(carrier = detailJson['_context'])
293+
entry['Detail'] = json.dumps(detailJson)
294+
except Exception as ex:
295+
pass
278296

279297
result = None
280298
try:

0 commit comments

Comments
 (0)