Skip to content

Commit 662bf4c

Browse files
authored
feat: Add execution id (#320)
* feat: Add execution id Adds an execution id for each request. When the LOG_EXECUTION_ID env var is set, the execution id will be included in logs.
1 parent dfc5059 commit 662bf4c

File tree

6 files changed

+570
-2
lines changed

6 files changed

+570
-2
lines changed

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
"watchdog>=1.0.0",
5656
"gunicorn>=19.2.0; platform_system!='Windows'",
5757
"cloudevents>=1.2.0,<2.0.0",
58+
"Werkzeug>=0.14,<4.0.0",
5859
],
5960
entry_points={
6061
"console_scripts": [

src/functions_framework/__init__.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import io
1818
import json
1919
import logging
20+
import logging.config
21+
import os
2022
import os.path
2123
import pathlib
2224
import sys
@@ -32,7 +34,12 @@
3234
from cloudevents.http import from_http, is_binary
3335
from cloudevents.http.event import CloudEvent
3436

35-
from functions_framework import _function_registry, _typed_event, event_conversion
37+
from functions_framework import (
38+
_function_registry,
39+
_typed_event,
40+
event_conversion,
41+
execution_id,
42+
)
3643
from functions_framework.background_event import BackgroundEvent
3744
from functions_framework.exceptions import (
3845
EventConversionException,
@@ -129,6 +136,7 @@ def setup_logging():
129136

130137

131138
def _http_view_func_wrapper(function, request):
139+
@execution_id.set_execution_context(request, _enable_execution_id_logging())
132140
@functools.wraps(function)
133141
def view_func(path):
134142
return function(request._get_current_object())
@@ -143,6 +151,7 @@ def _run_cloud_event(function, request):
143151

144152

145153
def _typed_event_func_wrapper(function, request, inputType: Type):
154+
@execution_id.set_execution_context(request, _enable_execution_id_logging())
146155
def view_func(path):
147156
try:
148157
data = request.get_json()
@@ -163,6 +172,7 @@ def view_func(path):
163172

164173

165174
def _cloud_event_view_func_wrapper(function, request):
175+
@execution_id.set_execution_context(request, _enable_execution_id_logging())
166176
def view_func(path):
167177
ce_exception = None
168178
event = None
@@ -198,6 +208,7 @@ def view_func(path):
198208

199209

200210
def _event_view_func_wrapper(function, request):
211+
@execution_id.set_execution_context(request, _enable_execution_id_logging())
201212
def view_func(path):
202213
if event_conversion.is_convertable_cloud_event(request):
203214
# Convert this CloudEvent to the equivalent background event data and context.
@@ -332,6 +343,9 @@ def create_app(target=None, source=None, signature_type=None):
332343

333344
source_module, spec = _function_registry.load_function_module(source)
334345

346+
if _enable_execution_id_logging():
347+
_configure_app_execution_id_logging()
348+
335349
# Create the application
336350
_app = flask.Flask(target, template_folder=template_folder)
337351
_app.register_error_handler(500, crash_handler)
@@ -355,6 +369,7 @@ def handle_none(rv):
355369
sys.stderr = _LoggingHandler("ERROR", sys.stderr)
356370
setup_logging()
357371

372+
_app.wsgi_app = execution_id.WsgiMiddleware(_app.wsgi_app)
358373
# Execute the module, within the application context
359374
with _app.app_context():
360375
try:
@@ -411,6 +426,26 @@ def __call__(self, *args, **kwargs):
411426
return self.app(*args, **kwargs)
412427

413428

429+
def _configure_app_execution_id_logging():
430+
# Logging needs to be configured before app logger is accessed
431+
logging.config.dictConfig(
432+
{
433+
"version": 1,
434+
"handlers": {
435+
"wsgi": {
436+
"class": "logging.StreamHandler",
437+
"stream": "ext://functions_framework.execution_id.logging_stream",
438+
},
439+
},
440+
"root": {"level": "INFO", "handlers": ["wsgi"]},
441+
}
442+
)
443+
444+
445+
def _enable_execution_id_logging():
446+
return os.environ.get("LOG_EXECUTION_ID")
447+
448+
414449
app = LazyWSGIApp()
415450

416451

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import contextlib
16+
import functools
17+
import io
18+
import json
19+
import logging
20+
import random
21+
import re
22+
import string
23+
import sys
24+
25+
import flask
26+
27+
from werkzeug.local import LocalProxy
28+
29+
_EXECUTION_ID_LENGTH = 12
30+
_EXECUTION_ID_CHARSET = string.digits + string.ascii_letters
31+
_LOGGING_API_LABELS_FIELD = "logging.googleapis.com/labels"
32+
_LOGGING_API_SPAN_ID_FIELD = "logging.googleapis.com/spanId"
33+
_TRACE_CONTEXT_REGEX_PATTERN = re.compile(
34+
r"^(?P<trace_id>[\w\d]+)/(?P<span_id>\d+);o=(?P<options>[01])$"
35+
)
36+
EXECUTION_ID_REQUEST_HEADER = "Function-Execution-Id"
37+
TRACE_CONTEXT_REQUEST_HEADER = "X-Cloud-Trace-Context"
38+
39+
logger = logging.getLogger(__name__)
40+
41+
42+
class ExecutionContext:
43+
def __init__(self, execution_id=None, span_id=None):
44+
self.execution_id = execution_id
45+
self.span_id = span_id
46+
47+
48+
def _get_current_context():
49+
return (
50+
flask.g.execution_id_context
51+
if flask.has_request_context() and "execution_id_context" in flask.g
52+
else None
53+
)
54+
55+
56+
def _set_current_context(context):
57+
if flask.has_request_context():
58+
flask.g.execution_id_context = context
59+
60+
61+
def _generate_execution_id():
62+
return "".join(
63+
_EXECUTION_ID_CHARSET[random.randrange(len(_EXECUTION_ID_CHARSET))]
64+
for _ in range(_EXECUTION_ID_LENGTH)
65+
)
66+
67+
68+
# Middleware to add execution id to request header if one does not already exist
69+
class WsgiMiddleware:
70+
def __init__(self, wsgi_app):
71+
self.wsgi_app = wsgi_app
72+
73+
def __call__(self, environ, start_response):
74+
execution_id = (
75+
environ.get("HTTP_FUNCTION_EXECUTION_ID") or _generate_execution_id()
76+
)
77+
environ["HTTP_FUNCTION_EXECUTION_ID"] = execution_id
78+
return self.wsgi_app(environ, start_response)
79+
80+
81+
# Sets execution id and span id for the request
82+
def set_execution_context(request, enable_id_logging=False):
83+
if enable_id_logging:
84+
stdout_redirect = contextlib.redirect_stdout(
85+
LoggingHandlerAddExecutionId(sys.stdout)
86+
)
87+
stderr_redirect = contextlib.redirect_stderr(
88+
LoggingHandlerAddExecutionId(sys.stderr)
89+
)
90+
else:
91+
stdout_redirect = contextlib.nullcontext()
92+
stderr_redirect = contextlib.nullcontext()
93+
94+
def decorator(view_function):
95+
@functools.wraps(view_function)
96+
def wrapper(*args, **kwargs):
97+
trace_context = re.match(
98+
_TRACE_CONTEXT_REGEX_PATTERN,
99+
request.headers.get(TRACE_CONTEXT_REQUEST_HEADER, ""),
100+
)
101+
execution_id = request.headers.get(EXECUTION_ID_REQUEST_HEADER)
102+
span_id = trace_context.group("span_id") if trace_context else None
103+
_set_current_context(ExecutionContext(execution_id, span_id))
104+
105+
with stderr_redirect, stdout_redirect:
106+
return view_function(*args, **kwargs)
107+
108+
return wrapper
109+
110+
return decorator
111+
112+
113+
@LocalProxy
114+
def logging_stream():
115+
return LoggingHandlerAddExecutionId(stream=flask.logging.wsgi_errors_stream)
116+
117+
118+
class LoggingHandlerAddExecutionId(io.TextIOWrapper):
119+
def __new__(cls, stream=sys.stdout):
120+
if isinstance(stream, LoggingHandlerAddExecutionId):
121+
return stream
122+
else:
123+
return super(LoggingHandlerAddExecutionId, cls).__new__(cls)
124+
125+
def __init__(self, stream=sys.stdout):
126+
io.TextIOWrapper.__init__(self, io.StringIO())
127+
self.stream = stream
128+
129+
def write(self, contents):
130+
if contents == "\n":
131+
return
132+
current_context = _get_current_context()
133+
if current_context is None:
134+
self.stream.write(contents + "\n")
135+
self.stream.flush()
136+
return
137+
try:
138+
execution_id = current_context.execution_id
139+
span_id = current_context.span_id
140+
payload = json.loads(contents)
141+
if not isinstance(payload, dict):
142+
payload = {"message": contents}
143+
except json.JSONDecodeError:
144+
if len(contents) > 0 and contents[-1] == "\n":
145+
contents = contents[:-1]
146+
payload = {"message": contents}
147+
if execution_id:
148+
payload[_LOGGING_API_LABELS_FIELD] = payload.get(
149+
_LOGGING_API_LABELS_FIELD, {}
150+
)
151+
payload[_LOGGING_API_LABELS_FIELD]["execution_id"] = execution_id
152+
if span_id:
153+
payload[_LOGGING_API_SPAN_ID_FIELD] = span_id
154+
self.stream.write(json.dumps(payload))
155+
self.stream.write("\n")
156+
self.stream.flush()

0 commit comments

Comments
 (0)