diff --git a/src/functions_framework/__init__.py b/src/functions_framework/__init__.py index 3bff83f0..9f5cf34e 100644 --- a/src/functions_framework/__init__.py +++ b/src/functions_framework/__init__.py @@ -126,7 +126,7 @@ def view_func(path): function(data, context) else: # This is a regular CloudEvent - event_data = request.get_json() + event_data = event_conversion.marshal_background_event_data(request) if not event_data: flask.abort(400) event_object = BackgroundEvent(**event_data) diff --git a/src/functions_framework/event_conversion.py b/src/functions_framework/event_conversion.py index 4d914802..596bee2c 100644 --- a/src/functions_framework/event_conversion.py +++ b/src/functions_framework/event_conversion.py @@ -13,7 +13,8 @@ # limitations under the License. import re -from typing import Tuple +from datetime import datetime +from typing import Optional, Tuple from cloudevents.http import CloudEvent @@ -55,6 +56,12 @@ _PUBSUB_CE_SERVICE = "pubsub.googleapis.com" _STORAGE_CE_SERVICE = "storage.googleapis.com" +# Raw pubsub types +_PUBSUB_EVENT_TYPE = "google.pubsub.topic.publish" +_PUBSUB_MESSAGE_TYPE = "type.googleapis.com/google.pubsub.v1.PubsubMessage" + +_PUBSUB_TOPIC_REQUEST_PATH = re.compile(r"projects\/[^/?]+\/topics\/[^/?]+") + # Maps background event services to their equivalent CloudEvent services. _SERVICE_BACKGROUND_TO_CE = { "providers/cloud.firestore/": _FIRESTORE_CE_SERVICE, @@ -89,8 +96,8 @@ def background_event_to_cloudevent(request) -> CloudEvent: - """Converts a background event represented by the given HTTP request into a CloudEvent. """ - event_data = request.get_json() + """Converts a background event represented by the given HTTP request into a CloudEvent.""" + event_data = marshal_background_event_data(request) if not event_data: raise EventConversionException("Failed to parse JSON") @@ -168,3 +175,56 @@ def _split_resource(context: Context) -> Tuple[str, str, str]: raise EventConversionException("Resource regex did not match") return service, match.group(1), match.group(2) + + +def marshal_background_event_data(request): + """Marshal the request body of a raw Pub/Sub HTTP request into the schema that is expected of + a background event""" + try: + request_data = request.get_json() + if not _is_raw_pubsub_payload(request_data): + # If this in not a raw Pub/Sub request, return the unaltered request data. + return request_data + return { + "context": { + "eventId": request_data["message"]["messageId"], + "timestamp": request_data["message"].get( + "publishTime", datetime.utcnow().isoformat() + "Z" + ), + "eventType": _PUBSUB_EVENT_TYPE, + "resource": { + "service": _PUBSUB_CE_SERVICE, + "type": _PUBSUB_MESSAGE_TYPE, + "name": _parse_pubsub_topic(request.path), + }, + }, + "data": { + "@type": _PUBSUB_MESSAGE_TYPE, + "data": request_data["message"]["data"], + "attributes": request_data["message"]["attributes"], + }, + } + except (AttributeError, KeyError, TypeError): + raise EventConversionException("Failed to convert Pub/Sub payload to event") + + +def _is_raw_pubsub_payload(request_data) -> bool: + """Does the given request body match the schema of a unmarshalled Pub/Sub request""" + return ( + request_data is not None + and "context" not in request_data + and "subscription" in request_data + and "message" in request_data + and "data" in request_data["message"] + and "messageId" in request_data["message"] + ) + + +def _parse_pubsub_topic(request_path) -> Optional[str]: + match = _PUBSUB_TOPIC_REQUEST_PATH.search(request_path) + if match: + return match.group(0) + else: + # It is possible to configure a Pub/Sub subscription to push directly to this function + # without passing the topic name in the URL path. + return "" diff --git a/tests/test_convert.py b/tests/test_convert.py index d3fe3e22..07580991 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -65,23 +65,64 @@ BACKGROUND_RESOURCE_STRING = "projects/_/buckets/some-bucket/objects/folder/Test.cs" +PUBSUB_CLOUD_EVENT = { + "specversion": "1.0", + "id": "1215011316659232", + "source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", + "time": "2020-05-18T12:13:19Z", + "type": "google.cloud.pubsub.topic.v1.messagePublished", + "datacontenttype": "application/json", + "data": { + "message": { + "data": "10", + }, + }, +} + @pytest.fixture def pubsub_cloudevent_output(): - event = { - "specversion": "1.0", - "id": "1215011316659232", - "source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", - "time": "2020-05-18T12:13:19Z", - "type": "google.cloud.pubsub.topic.v1.messagePublished", - "datacontenttype": "application/json", + return from_json(json.dumps(PUBSUB_CLOUD_EVENT)) + + +@pytest.fixture +def raw_pubsub_request(): + return { + "subscription": "projects/sample-project/subscriptions/gcf-test-sub", + "message": { + "data": "eyJmb28iOiJiYXIifQ==", + "messageId": "1215011316659232", + "attributes": {"test": "123"}, + }, + } + + +@pytest.fixture +def marshalled_pubsub_request(): + return { "data": { - "message": { - "data": "10", + "@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + "data": "eyJmb28iOiJiYXIifQ==", + "attributes": {"test": "123"}, + }, + "context": { + "eventId": "1215011316659232", + "eventType": "google.pubsub.topic.publish", + "resource": { + "name": "projects/sample-project/topics/gcf-test", + "service": "pubsub.googleapis.com", + "type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", }, + "timestamp": "2021-04-17T07:21:18.249Z", }, } + +@pytest.fixture +def raw_pubsub_cloudevent_output(marshalled_pubsub_request): + event = PUBSUB_CLOUD_EVENT.copy() + # the data payload is more complex for the raw pubsub request + event["data"] = {"message": marshalled_pubsub_request["data"]} return from_json(json.dumps(event)) @@ -212,3 +253,79 @@ def test_split_resource_no_resource_regex_match(): with pytest.raises(EventConversionException) as exc_info: event_conversion._split_resource(context) assert "Resource regex did not match" in exc_info.value.args[0] + + +def test_marshal_background_event_data_without_topic_in_path( + raw_pubsub_request, marshalled_pubsub_request +): + req = flask.Request.from_values(json=raw_pubsub_request, path="/myfunc/") + payload = event_conversion.marshal_background_event_data(req) + + # Remove timestamps as they get generates on the fly + del marshalled_pubsub_request["context"]["timestamp"] + del payload["context"]["timestamp"] + + # Resource name is set to empty string when it cannot be parsed from the request path + marshalled_pubsub_request["context"]["resource"]["name"] = "" + + assert payload == marshalled_pubsub_request + + +def test_marshal_background_event_data_with_topic_path( + raw_pubsub_request, marshalled_pubsub_request +): + req = flask.Request.from_values( + json=raw_pubsub_request, + path="x/projects/sample-project/topics/gcf-test?pubsub_trigger=true", + ) + payload = event_conversion.marshal_background_event_data(req) + + # Remove timestamps as they are generated on the fly. + del marshalled_pubsub_request["context"]["timestamp"] + del payload["context"]["timestamp"] + + assert payload == marshalled_pubsub_request + + +def test_pubsub_emulator_request_to_cloudevent( + raw_pubsub_request, raw_pubsub_cloudevent_output +): + req = flask.Request.from_values( + json=raw_pubsub_request, + path="x/projects/sample-project/topics/gcf-test?pubsub_trigger=true", + ) + cloudevent = event_conversion.background_event_to_cloudevent(req) + + # Remove timestamps as they are generated on the fly. + del raw_pubsub_cloudevent_output["time"] + del cloudevent["time"] + + assert cloudevent == raw_pubsub_cloudevent_output + + +def test_pubsub_emulator_request_to_cloudevent_without_topic_path( + raw_pubsub_request, raw_pubsub_cloudevent_output +): + req = flask.Request.from_values(json=raw_pubsub_request, path="/") + cloudevent = event_conversion.background_event_to_cloudevent(req) + + # Remove timestamps as they are generated on the fly. + del raw_pubsub_cloudevent_output["time"] + del cloudevent["time"] + + # Default to the service name, when the topic is not configured subscription's pushEndpoint. + raw_pubsub_cloudevent_output["source"] = "//pubsub.googleapis.com/" + + assert cloudevent == raw_pubsub_cloudevent_output + + +def test_pubsub_emulator_request_with_invalid_message( + raw_pubsub_request, raw_pubsub_cloudevent_output +): + # Create an invalid message payload + raw_pubsub_request["message"] = None + req = flask.Request.from_values(json=raw_pubsub_request, path="/") + + with pytest.raises(EventConversionException) as exc_info: + cloudevent = event_conversion.background_event_to_cloudevent(req) + assert "Failed to convert Pub/Sub payload to event" in exc_info.value.args[0]