Skip to content

feat: Add execution id #320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"watchdog>=1.0.0",
"gunicorn>=19.2.0; platform_system!='Windows'",
"cloudevents>=1.2.0,<2.0.0",
"Werkzeug>=0.14,<4.0.0",
],
entry_points={
"console_scripts": [
Expand Down
37 changes: 36 additions & 1 deletion src/functions_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io
import json
import logging
import logging.config
import os
import os.path
import pathlib
import sys
Expand All @@ -32,7 +34,12 @@
from cloudevents.http import from_http, is_binary
from cloudevents.http.event import CloudEvent

from functions_framework import _function_registry, _typed_event, event_conversion
from functions_framework import (
_function_registry,
_typed_event,
event_conversion,
execution_id,
)
from functions_framework.background_event import BackgroundEvent
from functions_framework.exceptions import (
EventConversionException,
Expand Down Expand Up @@ -129,6 +136,7 @@ def setup_logging():


def _http_view_func_wrapper(function, request):
@execution_id.set_execution_context(request, _enable_execution_id_logging())
@functools.wraps(function)
def view_func(path):
return function(request._get_current_object())
Expand All @@ -143,6 +151,7 @@ def _run_cloud_event(function, request):


def _typed_event_func_wrapper(function, request, inputType: Type):
@execution_id.set_execution_context(request, _enable_execution_id_logging())
def view_func(path):
try:
data = request.get_json()
Expand All @@ -163,6 +172,7 @@ def view_func(path):


def _cloud_event_view_func_wrapper(function, request):
@execution_id.set_execution_context(request, _enable_execution_id_logging())
def view_func(path):
ce_exception = None
event = None
Expand Down Expand Up @@ -198,6 +208,7 @@ def view_func(path):


def _event_view_func_wrapper(function, request):
@execution_id.set_execution_context(request, _enable_execution_id_logging())
def view_func(path):
if event_conversion.is_convertable_cloud_event(request):
# Convert this CloudEvent to the equivalent background event data and context.
Expand Down Expand Up @@ -332,6 +343,9 @@ def create_app(target=None, source=None, signature_type=None):

source_module, spec = _function_registry.load_function_module(source)

if _enable_execution_id_logging():
_configure_app_execution_id_logging()

# Create the application
_app = flask.Flask(target, template_folder=template_folder)
_app.register_error_handler(500, crash_handler)
Expand All @@ -355,6 +369,7 @@ def handle_none(rv):
sys.stderr = _LoggingHandler("ERROR", sys.stderr)
setup_logging()

_app.wsgi_app = execution_id.WsgiMiddleware(_app.wsgi_app)
# Execute the module, within the application context
with _app.app_context():
try:
Expand Down Expand Up @@ -411,6 +426,26 @@ def __call__(self, *args, **kwargs):
return self.app(*args, **kwargs)


def _configure_app_execution_id_logging():
# Logging needs to be configured before app logger is accessed
logging.config.dictConfig(
{
"version": 1,
"handlers": {
"wsgi": {
"class": "logging.StreamHandler",
"stream": "ext://functions_framework.execution_id.logging_stream",
},
},
"root": {"level": "INFO", "handlers": ["wsgi"]},
}
)


def _enable_execution_id_logging():
return os.environ.get("LOG_EXECUTION_ID")


app = LazyWSGIApp()


Expand Down
156 changes: 156 additions & 0 deletions src/functions_framework/execution_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import functools
import io
import json
import logging
import random
import re
import string
import sys

import flask

from werkzeug.local import LocalProxy

_EXECUTION_ID_LENGTH = 12
_EXECUTION_ID_CHARSET = string.digits + string.ascii_letters
_LOGGING_API_LABELS_FIELD = "logging.googleapis.com/labels"
_LOGGING_API_SPAN_ID_FIELD = "logging.googleapis.com/spanId"
_TRACE_CONTEXT_REGEX_PATTERN = re.compile(
r"^(?P<trace_id>[\w\d]+)/(?P<span_id>\d+);o=(?P<options>[01])$"
)
EXECUTION_ID_REQUEST_HEADER = "Function-Execution-Id"
TRACE_CONTEXT_REQUEST_HEADER = "X-Cloud-Trace-Context"

logger = logging.getLogger(__name__)


class ExecutionContext:
def __init__(self, execution_id=None, span_id=None):
self.execution_id = execution_id
self.span_id = span_id


def _get_current_context():
return (
flask.g.execution_id_context
if flask.has_request_context() and "execution_id_context" in flask.g
else None
)


def _set_current_context(context):
if flask.has_request_context():
flask.g.execution_id_context = context


def _generate_execution_id():
return "".join(
_EXECUTION_ID_CHARSET[random.randrange(len(_EXECUTION_ID_CHARSET))]
for _ in range(_EXECUTION_ID_LENGTH)
)


# Middleware to add execution id to request header if one does not already exist
class WsgiMiddleware:
def __init__(self, wsgi_app):
self.wsgi_app = wsgi_app

def __call__(self, environ, start_response):
execution_id = (
environ.get("HTTP_FUNCTION_EXECUTION_ID") or _generate_execution_id()
)
environ["HTTP_FUNCTION_EXECUTION_ID"] = execution_id
return self.wsgi_app(environ, start_response)


# Sets execution id and span id for the request
def set_execution_context(request, enable_id_logging=False):
if enable_id_logging:
stdout_redirect = contextlib.redirect_stdout(
LoggingHandlerAddExecutionId(sys.stdout)
)
stderr_redirect = contextlib.redirect_stderr(
LoggingHandlerAddExecutionId(sys.stderr)
)
else:
stdout_redirect = contextlib.nullcontext()
stderr_redirect = contextlib.nullcontext()

def decorator(view_function):
@functools.wraps(view_function)
def wrapper(*args, **kwargs):
trace_context = re.match(
_TRACE_CONTEXT_REGEX_PATTERN,
request.headers.get(TRACE_CONTEXT_REQUEST_HEADER, ""),
)
execution_id = request.headers.get(EXECUTION_ID_REQUEST_HEADER)
span_id = trace_context.group("span_id") if trace_context else None
_set_current_context(ExecutionContext(execution_id, span_id))

with stderr_redirect, stdout_redirect:
return view_function(*args, **kwargs)

return wrapper

return decorator


@LocalProxy
def logging_stream():
return LoggingHandlerAddExecutionId(stream=flask.logging.wsgi_errors_stream)


class LoggingHandlerAddExecutionId(io.TextIOWrapper):
def __new__(cls, stream=sys.stdout):
if isinstance(stream, LoggingHandlerAddExecutionId):
return stream
else:
return super(LoggingHandlerAddExecutionId, cls).__new__(cls)

def __init__(self, stream=sys.stdout):
io.TextIOWrapper.__init__(self, io.StringIO())
self.stream = stream

def write(self, contents):
if contents == "\n":
return
current_context = _get_current_context()
if current_context is None:
self.stream.write(contents + "\n")
self.stream.flush()
return
try:
execution_id = current_context.execution_id
span_id = current_context.span_id
payload = json.loads(contents)
if not isinstance(payload, dict):
payload = {"message": contents}
except json.JSONDecodeError:
if len(contents) > 0 and contents[-1] == "\n":
contents = contents[:-1]
payload = {"message": contents}
if execution_id:
payload[_LOGGING_API_LABELS_FIELD] = payload.get(
_LOGGING_API_LABELS_FIELD, {}
)
payload[_LOGGING_API_LABELS_FIELD]["execution_id"] = execution_id
if span_id:
payload[_LOGGING_API_SPAN_ID_FIELD] = span_id
self.stream.write(json.dumps(payload))
self.stream.write("\n")
self.stream.flush()
Loading
Loading