Skip to content

Commit 671aea3

Browse files
authored
Add instrumentation for AWS Lambda Service - Implementation (Part 2/2) (#777)
* Add instrumentation for AWS Lambda Service - Implementation * Lambda is CONSUMER SQS trace if 'Records' key in Lambda event * More robust check of SQS by indexing and catching * Explicitly catch errors we expect when determinig if SQS triggered Lambda
1 parent 9dc3bbb commit 671aea3

File tree

5 files changed

+536
-0
lines changed

5 files changed

+536
-0
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
([#742](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/742))
1313
- `opentelemetry-instrumentation` Add `setuptools` to `install_requires`
1414
([#781](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/781))
15+
- `opentelemetry-instrumentation-aws-lambda` Add instrumentation for AWS Lambda Service - Implementation (Part 2/2)
16+
([#777](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/777))
1517

1618
### Fixed
1719

instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py

+263
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,266 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
15+
"""
16+
The opentelemetry-instrumentation-aws-lambda package provides an `Instrumentor`
17+
to traces calls whithin a Python AWS Lambda function.
18+
19+
Usage
20+
-----
21+
22+
.. code:: python
23+
24+
# Copy this snippet into an AWS Lambda function
25+
26+
import boto3
27+
from opentelemetry.instrumentation.botocore import AwsBotocoreInstrumentor
28+
from opentelemetry.instrumentation.aws_lambda import AwsLambdaInstrumentor
29+
30+
31+
# Enable instrumentation
32+
AwsBotocoreInstrumentor().instrument()
33+
AwsLambdaInstrumentor().instrument()
34+
35+
# Lambda function
36+
def lambda_handler(event, context):
37+
s3 = boto3.resource('s3')
38+
for bucket in s3.buckets.all():
39+
print(bucket.name)
40+
41+
return "200 OK"
42+
43+
API
44+
---
45+
46+
The `instrument` method accepts the following keyword args:
47+
48+
tracer_provider (TracerProvider) - an optional tracer provider
49+
event_context_extractor (Callable) - a function that returns an OTel Trace
50+
Context given the Lambda Event the AWS Lambda was invoked with
51+
this function signature is: def event_context_extractor(lambda_event: Any) -> Context
52+
for example:
53+
54+
.. code:: python
55+
56+
from opentelemetry.instrumentation.aws_lambda import AwsLambdaInstrumentor
57+
58+
def custom_event_context_extractor(lambda_event):
59+
# If the `TraceContextTextMapPropagator` is the global propagator, we
60+
# can use it to parse out the context from the HTTP Headers.
61+
return get_global_textmap().extract(lambda_event["foo"]["headers"])
62+
63+
AwsLambdaInstrumentor().instrument(
64+
event_context_extractor=custom_event_context_extractor
65+
)
66+
"""
67+
68+
import logging
69+
import os
70+
from importlib import import_module
71+
from typing import Any, Callable, Collection
72+
73+
from wrapt import wrap_function_wrapper
74+
75+
from opentelemetry.context.context import Context
76+
from opentelemetry.instrumentation.aws_lambda.package import _instruments
77+
from opentelemetry.instrumentation.aws_lambda.version import __version__
78+
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
79+
from opentelemetry.instrumentation.utils import unwrap
80+
from opentelemetry.propagate import get_global_textmap
81+
from opentelemetry.propagators.aws.aws_xray_propagator import (
82+
TRACE_HEADER_KEY,
83+
AwsXRayPropagator,
84+
)
85+
from opentelemetry.semconv.resource import ResourceAttributes
86+
from opentelemetry.semconv.trace import SpanAttributes
87+
from opentelemetry.trace import (
88+
SpanKind,
89+
TracerProvider,
90+
get_tracer,
91+
get_tracer_provider,
92+
)
93+
from opentelemetry.trace.propagation import get_current_span
94+
95+
logger = logging.getLogger(__name__)
96+
97+
_HANDLER = "_HANDLER"
98+
_X_AMZN_TRACE_ID = "_X_AMZN_TRACE_ID"
99+
ORIG_HANDLER = "ORIG_HANDLER"
100+
101+
102+
def _default_event_context_extractor(lambda_event: Any) -> Context:
103+
"""Default way of extracting the context from the Lambda Event.
104+
105+
Assumes the Lambda Event is a map with the headers under the 'headers' key.
106+
This is the mapping to use when the Lambda is invoked by an API Gateway
107+
REST API where API Gateway is acting as a pure proxy for the request.
108+
109+
See more:
110+
https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
111+
112+
Args:
113+
lambda_event: user-defined, so it could be anything, but this
114+
method counts on it being a map with a 'headers' key
115+
Returns:
116+
A Context with configuration found in the event.
117+
"""
118+
try:
119+
headers = lambda_event["headers"]
120+
except (TypeError, KeyError):
121+
logger.debug(
122+
"Extracting context from Lambda Event failed: either enable X-Ray active tracing or configure API Gateway to trigger this Lambda function as a pure proxy. Otherwise, generated spans will have an invalid (empty) parent context."
123+
)
124+
headers = {}
125+
return get_global_textmap().extract(headers)
126+
127+
128+
def _determine_parent_context(
129+
lambda_event: Any, event_context_extractor: Callable[[Any], Context]
130+
) -> Context:
131+
"""Determine the parent context for the current Lambda invocation.
132+
133+
See more:
134+
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/instrumentation/aws-lambda.md#determining-the-parent-of-a-span
135+
136+
Args:
137+
lambda_event: user-defined, so it could be anything, but this
138+
method counts it being a map with a 'headers' key
139+
Returns:
140+
A Context with configuration found in the carrier.
141+
"""
142+
parent_context = None
143+
144+
xray_env_var = os.environ.get(_X_AMZN_TRACE_ID)
145+
146+
if xray_env_var:
147+
parent_context = AwsXRayPropagator().extract(
148+
{TRACE_HEADER_KEY: xray_env_var}
149+
)
150+
151+
if (
152+
parent_context
153+
and get_current_span(parent_context)
154+
.get_span_context()
155+
.trace_flags.sampled
156+
):
157+
return parent_context
158+
159+
if event_context_extractor:
160+
parent_context = event_context_extractor(lambda_event)
161+
else:
162+
parent_context = _default_event_context_extractor(lambda_event)
163+
164+
return parent_context
165+
166+
167+
def _instrument(
168+
wrapped_module_name,
169+
wrapped_function_name,
170+
event_context_extractor: Callable[[Any], Context],
171+
tracer_provider: TracerProvider = None,
172+
):
173+
def _instrumented_lambda_handler_call(
174+
call_wrapped, instance, args, kwargs
175+
):
176+
orig_handler_name = ".".join(
177+
[wrapped_module_name, wrapped_function_name]
178+
)
179+
180+
lambda_event = args[0]
181+
182+
parent_context = _determine_parent_context(
183+
lambda_event, event_context_extractor
184+
)
185+
186+
# See more:
187+
# https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
188+
try:
189+
if lambda_event["Records"][0]["eventSource"] == "aws:sqs":
190+
span_kind = SpanKind.CONSUMER
191+
except (IndexError, KeyError, TypeError):
192+
span_kind = SpanKind.SERVER
193+
194+
tracer = get_tracer(__name__, __version__, tracer_provider)
195+
196+
with tracer.start_as_current_span(
197+
name=orig_handler_name, context=parent_context, kind=span_kind,
198+
) as span:
199+
if span.is_recording():
200+
lambda_context = args[1]
201+
# NOTE: The specs mention an exception here, allowing the
202+
# `ResourceAttributes.FAAS_ID` attribute to be set as a span
203+
# attribute instead of a resource attribute.
204+
#
205+
# See more:
206+
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/faas.md#example
207+
span.set_attribute(
208+
ResourceAttributes.FAAS_ID,
209+
lambda_context.invoked_function_arn,
210+
)
211+
span.set_attribute(
212+
SpanAttributes.FAAS_EXECUTION,
213+
lambda_context.aws_request_id,
214+
)
215+
216+
result = call_wrapped(*args, **kwargs)
217+
218+
_tracer_provider = tracer_provider or get_tracer_provider()
219+
try:
220+
# NOTE: `force_flush` before function quit in case of Lambda freeze.
221+
# Assumes we are using the OpenTelemetry SDK implementation of the
222+
# `TracerProvider`.
223+
_tracer_provider.force_flush()
224+
except Exception: # pylint: disable=broad-except
225+
logger.error(
226+
"TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
227+
)
228+
229+
return result
230+
231+
wrap_function_wrapper(
232+
wrapped_module_name,
233+
wrapped_function_name,
234+
_instrumented_lambda_handler_call,
235+
)
236+
237+
238+
class AwsLambdaInstrumentor(BaseInstrumentor):
239+
def instrumentation_dependencies(self) -> Collection[str]:
240+
return _instruments
241+
242+
def _instrument(self, **kwargs):
243+
"""Instruments Lambda Handlers on AWS Lambda.
244+
245+
See more:
246+
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/instrumentation/aws-lambda.md#instrumenting-aws-lambda
247+
248+
Args:
249+
**kwargs: Optional arguments
250+
``tracer_provider``: a TracerProvider, defaults to global
251+
``event_context_extractor``: a method which takes the Lambda
252+
Event as input and extracts an OTel Context from it. By default,
253+
the context is extracted from the HTTP headers of an API Gateway
254+
request.
255+
"""
256+
lambda_handler = os.environ.get(ORIG_HANDLER, os.environ.get(_HANDLER))
257+
# pylint: disable=attribute-defined-outside-init
258+
(
259+
self._wrapped_module_name,
260+
self._wrapped_function_name,
261+
) = lambda_handler.rsplit(".", 1)
262+
263+
_instrument(
264+
self._wrapped_module_name,
265+
self._wrapped_function_name,
266+
event_context_extractor=kwargs.get(
267+
"event_context_extractor", _default_event_context_extractor
268+
),
269+
tracer_provider=kwargs.get("tracer_provider"),
270+
)
271+
272+
def _uninstrument(self, **kwargs):
273+
unwrap(
274+
import_module(self._wrapped_module_name),
275+
self._wrapped_function_name,
276+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def handler(event, context):
17+
return "200 ok"

0 commit comments

Comments
 (0)