diff --git a/setup.py b/setup.py index 6ee9902f..d4a9fea5 100644 --- a/setup.py +++ b/setup.py @@ -55,6 +55,7 @@ "watchdog>=1.0.0", "gunicorn>=19.2.0; platform_system!='Windows'", "cloudevents>=1.2.0,<2.0.0", + "Werkzeug>=0.14,<4.0.0", ], entry_points={ "console_scripts": [ diff --git a/src/functions_framework/__init__.py b/src/functions_framework/__init__.py index 8c23e5c0..7474c01e 100644 --- a/src/functions_framework/__init__.py +++ b/src/functions_framework/__init__.py @@ -17,6 +17,8 @@ import io import json import logging +import logging.config +import os import os.path import pathlib import sys @@ -32,7 +34,12 @@ from cloudevents.http import from_http, is_binary from cloudevents.http.event import CloudEvent -from functions_framework import _function_registry, _typed_event, event_conversion +from functions_framework import ( + _function_registry, + _typed_event, + event_conversion, + execution_id, +) from functions_framework.background_event import BackgroundEvent from functions_framework.exceptions import ( EventConversionException, @@ -129,6 +136,7 @@ def setup_logging(): def _http_view_func_wrapper(function, request): + @execution_id.set_execution_context(request, _enable_execution_id_logging()) @functools.wraps(function) def view_func(path): return function(request._get_current_object()) @@ -143,6 +151,7 @@ def _run_cloud_event(function, request): def _typed_event_func_wrapper(function, request, inputType: Type): + @execution_id.set_execution_context(request, _enable_execution_id_logging()) def view_func(path): try: data = request.get_json() @@ -163,6 +172,7 @@ def view_func(path): def _cloud_event_view_func_wrapper(function, request): + @execution_id.set_execution_context(request, _enable_execution_id_logging()) def view_func(path): ce_exception = None event = None @@ -198,6 +208,7 @@ def view_func(path): def _event_view_func_wrapper(function, request): + @execution_id.set_execution_context(request, _enable_execution_id_logging()) def view_func(path): if event_conversion.is_convertable_cloud_event(request): # Convert this CloudEvent to the equivalent background event data and context. @@ -332,6 +343,9 @@ def create_app(target=None, source=None, signature_type=None): source_module, spec = _function_registry.load_function_module(source) + if _enable_execution_id_logging(): + _configure_app_execution_id_logging() + # Create the application _app = flask.Flask(target, template_folder=template_folder) _app.register_error_handler(500, crash_handler) @@ -355,6 +369,7 @@ def handle_none(rv): sys.stderr = _LoggingHandler("ERROR", sys.stderr) setup_logging() + _app.wsgi_app = execution_id.WsgiMiddleware(_app.wsgi_app) # Execute the module, within the application context with _app.app_context(): try: @@ -411,6 +426,26 @@ def __call__(self, *args, **kwargs): return self.app(*args, **kwargs) +def _configure_app_execution_id_logging(): + # Logging needs to be configured before app logger is accessed + logging.config.dictConfig( + { + "version": 1, + "handlers": { + "wsgi": { + "class": "logging.StreamHandler", + "stream": "ext://functions_framework.execution_id.logging_stream", + }, + }, + "root": {"level": "INFO", "handlers": ["wsgi"]}, + } + ) + + +def _enable_execution_id_logging(): + return os.environ.get("LOG_EXECUTION_ID") + + app = LazyWSGIApp() diff --git a/src/functions_framework/execution_id.py b/src/functions_framework/execution_id.py new file mode 100644 index 00000000..2b106531 --- /dev/null +++ b/src/functions_framework/execution_id.py @@ -0,0 +1,156 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import functools +import io +import json +import logging +import random +import re +import string +import sys + +import flask + +from werkzeug.local import LocalProxy + +_EXECUTION_ID_LENGTH = 12 +_EXECUTION_ID_CHARSET = string.digits + string.ascii_letters +_LOGGING_API_LABELS_FIELD = "logging.googleapis.com/labels" +_LOGGING_API_SPAN_ID_FIELD = "logging.googleapis.com/spanId" +_TRACE_CONTEXT_REGEX_PATTERN = re.compile( + r"^(?P[\w\d]+)/(?P\d+);o=(?P[01])$" +) +EXECUTION_ID_REQUEST_HEADER = "Function-Execution-Id" +TRACE_CONTEXT_REQUEST_HEADER = "X-Cloud-Trace-Context" + +logger = logging.getLogger(__name__) + + +class ExecutionContext: + def __init__(self, execution_id=None, span_id=None): + self.execution_id = execution_id + self.span_id = span_id + + +def _get_current_context(): + return ( + flask.g.execution_id_context + if flask.has_request_context() and "execution_id_context" in flask.g + else None + ) + + +def _set_current_context(context): + if flask.has_request_context(): + flask.g.execution_id_context = context + + +def _generate_execution_id(): + return "".join( + _EXECUTION_ID_CHARSET[random.randrange(len(_EXECUTION_ID_CHARSET))] + for _ in range(_EXECUTION_ID_LENGTH) + ) + + +# Middleware to add execution id to request header if one does not already exist +class WsgiMiddleware: + def __init__(self, wsgi_app): + self.wsgi_app = wsgi_app + + def __call__(self, environ, start_response): + execution_id = ( + environ.get("HTTP_FUNCTION_EXECUTION_ID") or _generate_execution_id() + ) + environ["HTTP_FUNCTION_EXECUTION_ID"] = execution_id + return self.wsgi_app(environ, start_response) + + +# Sets execution id and span id for the request +def set_execution_context(request, enable_id_logging=False): + if enable_id_logging: + stdout_redirect = contextlib.redirect_stdout( + LoggingHandlerAddExecutionId(sys.stdout) + ) + stderr_redirect = contextlib.redirect_stderr( + LoggingHandlerAddExecutionId(sys.stderr) + ) + else: + stdout_redirect = contextlib.nullcontext() + stderr_redirect = contextlib.nullcontext() + + def decorator(view_function): + @functools.wraps(view_function) + def wrapper(*args, **kwargs): + trace_context = re.match( + _TRACE_CONTEXT_REGEX_PATTERN, + request.headers.get(TRACE_CONTEXT_REQUEST_HEADER, ""), + ) + execution_id = request.headers.get(EXECUTION_ID_REQUEST_HEADER) + span_id = trace_context.group("span_id") if trace_context else None + _set_current_context(ExecutionContext(execution_id, span_id)) + + with stderr_redirect, stdout_redirect: + return view_function(*args, **kwargs) + + return wrapper + + return decorator + + +@LocalProxy +def logging_stream(): + return LoggingHandlerAddExecutionId(stream=flask.logging.wsgi_errors_stream) + + +class LoggingHandlerAddExecutionId(io.TextIOWrapper): + def __new__(cls, stream=sys.stdout): + if isinstance(stream, LoggingHandlerAddExecutionId): + return stream + else: + return super(LoggingHandlerAddExecutionId, cls).__new__(cls) + + def __init__(self, stream=sys.stdout): + io.TextIOWrapper.__init__(self, io.StringIO()) + self.stream = stream + + def write(self, contents): + if contents == "\n": + return + current_context = _get_current_context() + if current_context is None: + self.stream.write(contents + "\n") + self.stream.flush() + return + try: + execution_id = current_context.execution_id + span_id = current_context.span_id + payload = json.loads(contents) + if not isinstance(payload, dict): + payload = {"message": contents} + except json.JSONDecodeError: + if len(contents) > 0 and contents[-1] == "\n": + contents = contents[:-1] + payload = {"message": contents} + if execution_id: + payload[_LOGGING_API_LABELS_FIELD] = payload.get( + _LOGGING_API_LABELS_FIELD, {} + ) + payload[_LOGGING_API_LABELS_FIELD]["execution_id"] = execution_id + if span_id: + payload[_LOGGING_API_SPAN_ID_FIELD] = span_id + self.stream.write(json.dumps(payload)) + self.stream.write("\n") + self.stream.flush() diff --git a/tests/test_execution_id.py b/tests/test_execution_id.py new file mode 100644 index 00000000..bfddfc3c --- /dev/null +++ b/tests/test_execution_id.py @@ -0,0 +1,343 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +import json +import pathlib +import re +import sys + +from functools import partial +from unittest.mock import Mock + +import pretend +import pytest + +from functions_framework import create_app, execution_id + +TEST_FUNCTIONS_DIR = pathlib.Path(__file__).resolve().parent / "test_functions" +TEST_EXECUTION_ID = "test_execution_id" +TEST_SPAN_ID = "123456" + + +def test_user_function_can_retrieve_execution_id_from_header(): + source = TEST_FUNCTIONS_DIR / "execution_id" / "main.py" + target = "function" + client = create_app(target, source).test_client() + resp = client.post( + "/", + headers={ + "Function-Execution-Id": TEST_EXECUTION_ID, + "Content-Type": "application/json", + }, + ) + + assert resp.get_json()["execution_id"] == TEST_EXECUTION_ID + + +def test_uncaught_exception_in_user_function_sets_execution_id(capsys, monkeypatch): + monkeypatch.setenv("LOG_EXECUTION_ID", "True") + source = TEST_FUNCTIONS_DIR / "execution_id" / "main.py" + target = "error" + app = create_app(target, source) + client = app.test_client() + resp = client.post( + "/", + headers={ + "Function-Execution-Id": TEST_EXECUTION_ID, + "Content-Type": "application/json", + }, + ) + assert resp.status_code == 500 + record = capsys.readouterr() + assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.err + + +def test_print_from_user_function_sets_execution_id(capsys, monkeypatch): + monkeypatch.setenv("LOG_EXECUTION_ID", "True") + source = TEST_FUNCTIONS_DIR / "execution_id" / "main.py" + target = "print_message" + app = create_app(target, source) + client = app.test_client() + client.post( + "/", + headers={ + "Function-Execution-Id": TEST_EXECUTION_ID, + "Content-Type": "application/json", + }, + json={"message": "some-message"}, + ) + record = capsys.readouterr() + assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.out + assert '"message": "some-message"' in record.out + + +def test_log_from_user_function_sets_execution_id(capsys, monkeypatch): + monkeypatch.setenv("LOG_EXECUTION_ID", "True") + source = TEST_FUNCTIONS_DIR / "execution_id" / "main.py" + target = "log_message" + app = create_app(target, source) + client = app.test_client() + client.post( + "/", + headers={ + "Function-Execution-Id": TEST_EXECUTION_ID, + "Content-Type": "application/json", + }, + json={"message": json.dumps({"custom-field": "some-message"})}, + ) + record = capsys.readouterr() + assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.err + assert '"custom-field": "some-message"' in record.err + + +def test_user_function_can_retrieve_generated_execution_id(monkeypatch): + monkeypatch.setattr( + execution_id, "_generate_execution_id", lambda: TEST_EXECUTION_ID + ) + source = TEST_FUNCTIONS_DIR / "execution_id" / "main.py" + target = "function" + client = create_app(target, source).test_client() + resp = client.post( + "/", + headers={ + "Content-Type": "application/json", + }, + ) + + assert resp.get_json()["execution_id"] == TEST_EXECUTION_ID + + +def test_does_not_set_execution_id_when_not_enabled(capsys): + source = TEST_FUNCTIONS_DIR / "execution_id" / "main.py" + target = "print_message" + app = create_app(target, source) + client = app.test_client() + client.post( + "/", + headers={ + "Function-Execution-Id": TEST_EXECUTION_ID, + "Content-Type": "application/json", + }, + json={"message": "some-message"}, + ) + record = capsys.readouterr() + assert f'"execution_id": "{TEST_EXECUTION_ID}"' not in record.out + assert "some-message" in record.out + + +def test_generate_execution_id(): + expected_matching_regex = "^[0-9a-zA-Z]{12}$" + actual_execution_id = execution_id._generate_execution_id() + + match = re.match(expected_matching_regex, actual_execution_id).group(0) + assert match == actual_execution_id + + +@pytest.mark.parametrize( + "headers,expected_execution_id,expected_span_id", + [ + ( + { + "X-Cloud-Trace-Context": f"TRACE_ID/{TEST_SPAN_ID};o=1", + "Function-Execution-Id": TEST_EXECUTION_ID, + }, + TEST_EXECUTION_ID, + TEST_SPAN_ID, + ), + ( + { + "X-Cloud-Trace-Context": f"TRACE_ID/{TEST_SPAN_ID};o=1", + "Function-Execution-Id": TEST_EXECUTION_ID, + }, + TEST_EXECUTION_ID, + TEST_SPAN_ID, + ), + ({}, None, None), + ( + { + "X-Cloud-Trace-Context": "malformed trace context string", + "Function-Execution-Id": TEST_EXECUTION_ID, + }, + TEST_EXECUTION_ID, + None, + ), + ], +) +def test_set_execution_context( + headers, expected_execution_id, expected_span_id, monkeypatch +): + request = pretend.stub(headers=headers) + + def view_func(): + pass + + monkeypatch.setattr( + execution_id, "_generate_execution_id", lambda: TEST_EXECUTION_ID + ) + mock_g = Mock() + monkeypatch.setattr(execution_id.flask, "g", mock_g) + monkeypatch.setattr(execution_id.flask, "has_request_context", lambda: True) + execution_id.set_execution_context(request)(view_func)() + + assert mock_g.execution_id_context.span_id == expected_span_id + assert mock_g.execution_id_context.execution_id == expected_execution_id + + +@pytest.mark.parametrize( + "log_message,expected_log_json", + [ + ("text message", {"message": "text message"}), + ( + json.dumps({"custom-field1": "value1", "custom-field2": "value2"}), + {"custom-field1": "value1", "custom-field2": "value2"}, + ), + ("[]", {"message": "[]"}), + ], +) +def test_log_handler(monkeypatch, log_message, expected_log_json, capsys): + log_handler = execution_id.LoggingHandlerAddExecutionId(stream=sys.stdout) + monkeypatch.setattr( + execution_id, + "_get_current_context", + lambda: execution_id.ExecutionContext( + span_id=TEST_SPAN_ID, execution_id=TEST_EXECUTION_ID + ), + ) + expected_log_json.update( + { + "logging.googleapis.com/labels": { + "execution_id": TEST_EXECUTION_ID, + }, + "logging.googleapis.com/spanId": TEST_SPAN_ID, + } + ) + + log_handler.write(log_message) + record = capsys.readouterr() + assert json.loads(record.out) == expected_log_json + assert json.loads(record.out) == expected_log_json + + +def test_log_handler_without_context_logs_unmodified(monkeypatch, capsys): + log_handler = execution_id.LoggingHandlerAddExecutionId(stream=sys.stdout) + monkeypatch.setattr( + execution_id, + "_get_current_context", + lambda: None, + ) + expected_message = "log message\n" + + log_handler.write("log message") + record = capsys.readouterr() + assert record.out == expected_message + + +def test_log_handler_ignores_newlines(monkeypatch, capsys): + log_handler = execution_id.LoggingHandlerAddExecutionId(stream=sys.stdout) + monkeypatch.setattr( + execution_id, + "_get_current_context", + lambda: execution_id.ExecutionContext( + span_id=TEST_SPAN_ID, execution_id=TEST_EXECUTION_ID + ), + ) + + log_handler.write("\n") + record = capsys.readouterr() + assert record.out == "" + + +def test_log_handler_does_not_nest(): + log_handler_1 = execution_id.LoggingHandlerAddExecutionId(stream=sys.stdout) + log_handler_2 = execution_id.LoggingHandlerAddExecutionId(log_handler_1) + + assert log_handler_1 == log_handler_2 + + +def test_log_handler_omits_empty_execution_context(monkeypatch, capsys): + log_handler = execution_id.LoggingHandlerAddExecutionId(stream=sys.stdout) + monkeypatch.setattr( + execution_id, + "_get_current_context", + lambda: execution_id.ExecutionContext(span_id=None, execution_id=None), + ) + expected_json = { + "message": "some message", + } + + log_handler.write("some message") + record = capsys.readouterr() + assert json.loads(record.out) == expected_json + + +@pytest.mark.asyncio +async def test_maintains_execution_id_for_concurrent_requests(monkeypatch, capsys): + monkeypatch.setenv("LOG_EXECUTION_ID", "True") + monkeypatch.setattr( + execution_id, + "_generate_execution_id", + Mock(side_effect=("test-execution-id-1", "test-execution-id-2")), + ) + + expected_logs = ( + { + "message": "message1", + "logging.googleapis.com/labels": {"execution_id": "test-execution-id-1"}, + }, + { + "message": "message2", + "logging.googleapis.com/labels": {"execution_id": "test-execution-id-2"}, + }, + { + "message": "message1", + "logging.googleapis.com/labels": {"execution_id": "test-execution-id-1"}, + }, + { + "message": "message2", + "logging.googleapis.com/labels": {"execution_id": "test-execution-id-2"}, + }, + ) + + source = TEST_FUNCTIONS_DIR / "execution_id" / "main.py" + target = "sleep" + client = create_app(target, source).test_client() + loop = asyncio.get_event_loop() + response1 = loop.run_in_executor( + None, + partial( + client.post, + "/", + headers={ + "Content-Type": "application/json", + }, + json={"message": "message1"}, + ), + ) + response2 = loop.run_in_executor( + None, + partial( + client.post, + "/", + headers={ + "Content-Type": "application/json", + }, + json={"message": "message2"}, + ), + ) + await asyncio.wait((response1, response2)) + record = capsys.readouterr() + logs = record.err.strip().split("\n") + logs_as_json = tuple(json.loads(log) for log in logs) + + assert logs_as_json == expected_logs diff --git a/tests/test_functions/execution_id/main.py b/tests/test_functions/execution_id/main.py new file mode 100644 index 00000000..72d1eaff --- /dev/null +++ b/tests/test_functions/execution_id/main.py @@ -0,0 +1,33 @@ +import logging +import time + +logger = logging.getLogger(__name__) + + +def print_message(request): + json = request.get_json(silent=True) + print(json.get("message")) + return "success", 200 + + +def log_message(request): + json = request.get_json(silent=True) + logger.info(json.get("message")) + return "success", 200 + + +def function(request): + return {"execution_id": request.headers.get("Function-Execution-Id")} + + +def error(request): + return 1 / 0 + + +def sleep(request): + json = request.get_json(silent=True) + message = json.get("message") + logger.info(message) + time.sleep(1) + logger.info(message) + return "success", 200 diff --git a/tests/test_view_functions.py b/tests/test_view_functions.py index f69b2155..a32fe9e4 100644 --- a/tests/test_view_functions.py +++ b/tests/test_view_functions.py @@ -25,7 +25,7 @@ def test_http_view_func_wrapper(): function = pretend.call_recorder(lambda request: "Hello") request_object = pretend.stub() - local_proxy = pretend.stub(_get_current_object=lambda: request_object) + local_proxy = pretend.stub(_get_current_object=lambda: request_object, headers={}) view_func = functions_framework._http_view_func_wrapper(function, local_proxy) view_func("/some/path")