-
Notifications
You must be signed in to change notification settings - Fork 537
/
Copy pathinit_serverless_sdk.py
93 lines (74 loc) · 3.11 KB
/
init_serverless_sdk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
For manual instrumentation,
The Handler function string of an aws lambda function should be added as an
environment variable with a key of 'SENTRY_INITIAL_HANDLER' along with the 'DSN'
Then the Handler function sstring should be replaced with
'sentry_sdk.integrations.init_serverless_sdk.sentry_lambda_handler'
"""
import os
import sys
import re
import sentry_sdk
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.utils import Dsn
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration
if TYPE_CHECKING:
from typing import Any
def extension_relay_dsn(original_dsn):
dsn = Dsn(original_dsn)
dsn.host = "localhost"
dsn.port = 5333
dsn.scheme = "http"
return str(dsn)
# Configure Sentry SDK
sentry_sdk.init(
dsn=extension_relay_dsn(os.environ["SENTRY_DSN"]),
integrations=[AwsLambdaIntegration(timeout_warning=True)],
traces_sample_rate=float(os.environ["SENTRY_TRACES_SAMPLE_RATE"]),
)
class AWSLambdaModuleLoader:
DIR_PATH_REGEX = r"^(.+)\/([^\/]+)$"
def __init__(self, sentry_initial_handler):
try:
module_path, self.handler_name = sentry_initial_handler.rsplit(".", 1)
except ValueError:
raise ValueError("Incorrect AWS Handler path (Not a path)")
self.extract_and_load_lambda_function_module(module_path)
def extract_and_load_lambda_function_module(self, module_path):
"""
Method that extracts and loads lambda function module from module_path
"""
py_version = sys.version_info
if re.match(self.DIR_PATH_REGEX, module_path):
# With a path like -> `scheduler/scheduler/event`
# `module_name` is `event`, and `module_file_path` is `scheduler/scheduler/event.py`
module_name = module_path.split(os.path.sep)[-1]
module_file_path = module_path + ".py"
# Supported python versions are 2.7, 3.6, 3.7, 3.8
if py_version >= (3, 5):
import importlib.util
spec = importlib.util.spec_from_file_location(
module_name, module_file_path
)
self.lambda_function_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(self.lambda_function_module)
elif py_version[0] < 3:
import imp
self.lambda_function_module = imp.load_source(
module_name, module_file_path
)
else:
raise ValueError("Python version %s is not supported." % py_version)
else:
import importlib
self.lambda_function_module = importlib.import_module(module_path)
def get_lambda_handler(self):
return getattr(self.lambda_function_module, self.handler_name)
def sentry_lambda_handler(event, context):
# type: (Any, Any) -> None
"""
Handler function that invokes a lambda handler which path is defined in
environment variables as "SENTRY_INITIAL_HANDLER"
"""
module_loader = AWSLambdaModuleLoader(os.environ["SENTRY_INITIAL_HANDLER"])
return module_loader.get_lambda_handler()(event, context)