Skip to content

fix local development with the Pub/Sub emulator #121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/functions_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def view_func(path):
function(data, context)
else:
# This is a regular CloudEvent
event_data = request.get_json()
event_data = event_conversion.marshal_background_event_data(request)
if not event_data:
flask.abort(400)
event_object = BackgroundEvent(**event_data)
Expand Down
66 changes: 63 additions & 3 deletions src/functions_framework/event_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.
import re

from typing import Tuple
from datetime import datetime
from typing import Optional, Tuple

from cloudevents.http import CloudEvent

Expand Down Expand Up @@ -55,6 +56,12 @@
_PUBSUB_CE_SERVICE = "pubsub.googleapis.com"
_STORAGE_CE_SERVICE = "storage.googleapis.com"

# Raw pubsub types
_PUBSUB_EVENT_TYPE = "google.pubsub.topic.publish"
_PUBSUB_MESSAGE_TYPE = "type.googleapis.com/google.pubsub.v1.PubsubMessage"

_PUBSUB_TOPIC_REQUEST_PATH = re.compile(r"projects\/[^/?]+\/topics\/[^/?]+")

# Maps background event services to their equivalent CloudEvent services.
_SERVICE_BACKGROUND_TO_CE = {
"providers/cloud.firestore/": _FIRESTORE_CE_SERVICE,
Expand Down Expand Up @@ -89,8 +96,8 @@


def background_event_to_cloudevent(request) -> CloudEvent:
"""Converts a background event represented by the given HTTP request into a CloudEvent. """
event_data = request.get_json()
"""Converts a background event represented by the given HTTP request into a CloudEvent."""
event_data = marshal_background_event_data(request)
if not event_data:
raise EventConversionException("Failed to parse JSON")

Expand Down Expand Up @@ -168,3 +175,56 @@ def _split_resource(context: Context) -> Tuple[str, str, str]:
raise EventConversionException("Resource regex did not match")

return service, match.group(1), match.group(2)


def marshal_background_event_data(request):
"""Marshal the request body of a raw Pub/Sub HTTP request into the schema that is expected of
a background event"""
try:
request_data = request.get_json()
if not _is_raw_pubsub_payload(request_data):
# If this in not a raw Pub/Sub request, return the unaltered request data.
return request_data
return {
"context": {
"eventId": request_data["message"]["messageId"],
"timestamp": request_data["message"].get(
"publishTime", datetime.utcnow().isoformat() + "Z"
),
"eventType": _PUBSUB_EVENT_TYPE,
"resource": {
"service": _PUBSUB_CE_SERVICE,
"type": _PUBSUB_MESSAGE_TYPE,
"name": _parse_pubsub_topic(request.path),
},
},
"data": {
"@type": _PUBSUB_MESSAGE_TYPE,
"data": request_data["message"]["data"],
"attributes": request_data["message"]["attributes"],
},
}
except (AttributeError, KeyError, TypeError):
raise EventConversionException("Failed to convert Pub/Sub payload to event")


def _is_raw_pubsub_payload(request_data) -> bool:
"""Does the given request body match the schema of a unmarshalled Pub/Sub request"""
return (
request_data is not None
and "context" not in request_data
and "subscription" in request_data
and "message" in request_data
and "data" in request_data["message"]
and "messageId" in request_data["message"]
)


def _parse_pubsub_topic(request_path) -> Optional[str]:
match = _PUBSUB_TOPIC_REQUEST_PATH.search(request_path)
if match:
return match.group(0)
else:
# It is possible to configure a Pub/Sub subscription to push directly to this function
# without passing the topic name in the URL path.
return ""
135 changes: 126 additions & 9 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,64 @@

BACKGROUND_RESOURCE_STRING = "projects/_/buckets/some-bucket/objects/folder/Test.cs"

PUBSUB_CLOUD_EVENT = {
"specversion": "1.0",
"id": "1215011316659232",
"source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test",
"time": "2020-05-18T12:13:19Z",
"type": "google.cloud.pubsub.topic.v1.messagePublished",
"datacontenttype": "application/json",
"data": {
"message": {
"data": "10",
},
},
}


@pytest.fixture
def pubsub_cloudevent_output():
event = {
"specversion": "1.0",
"id": "1215011316659232",
"source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test",
"time": "2020-05-18T12:13:19Z",
"type": "google.cloud.pubsub.topic.v1.messagePublished",
"datacontenttype": "application/json",
return from_json(json.dumps(PUBSUB_CLOUD_EVENT))


@pytest.fixture
def raw_pubsub_request():
return {
"subscription": "projects/sample-project/subscriptions/gcf-test-sub",
"message": {
"data": "eyJmb28iOiJiYXIifQ==",
"messageId": "1215011316659232",
"attributes": {"test": "123"},
},
}


@pytest.fixture
def marshalled_pubsub_request():
return {
"data": {
"message": {
"data": "10",
"@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage",
"data": "eyJmb28iOiJiYXIifQ==",
"attributes": {"test": "123"},
},
"context": {
"eventId": "1215011316659232",
"eventType": "google.pubsub.topic.publish",
"resource": {
"name": "projects/sample-project/topics/gcf-test",
"service": "pubsub.googleapis.com",
"type": "type.googleapis.com/google.pubsub.v1.PubsubMessage",
},
"timestamp": "2021-04-17T07:21:18.249Z",
},
}


@pytest.fixture
def raw_pubsub_cloudevent_output(marshalled_pubsub_request):
event = PUBSUB_CLOUD_EVENT.copy()
# the data payload is more complex for the raw pubsub request
event["data"] = {"message": marshalled_pubsub_request["data"]}
return from_json(json.dumps(event))


Expand Down Expand Up @@ -212,3 +253,79 @@ def test_split_resource_no_resource_regex_match():
with pytest.raises(EventConversionException) as exc_info:
event_conversion._split_resource(context)
assert "Resource regex did not match" in exc_info.value.args[0]


def test_marshal_background_event_data_without_topic_in_path(
raw_pubsub_request, marshalled_pubsub_request
):
req = flask.Request.from_values(json=raw_pubsub_request, path="/myfunc/")
payload = event_conversion.marshal_background_event_data(req)

# Remove timestamps as they get generates on the fly
del marshalled_pubsub_request["context"]["timestamp"]
del payload["context"]["timestamp"]

# Resource name is set to empty string when it cannot be parsed from the request path
marshalled_pubsub_request["context"]["resource"]["name"] = ""

assert payload == marshalled_pubsub_request


def test_marshal_background_event_data_with_topic_path(
raw_pubsub_request, marshalled_pubsub_request
):
req = flask.Request.from_values(
json=raw_pubsub_request,
path="x/projects/sample-project/topics/gcf-test?pubsub_trigger=true",
)
payload = event_conversion.marshal_background_event_data(req)

# Remove timestamps as they are generated on the fly.
del marshalled_pubsub_request["context"]["timestamp"]
del payload["context"]["timestamp"]

assert payload == marshalled_pubsub_request


def test_pubsub_emulator_request_to_cloudevent(
raw_pubsub_request, raw_pubsub_cloudevent_output
):
req = flask.Request.from_values(
json=raw_pubsub_request,
path="x/projects/sample-project/topics/gcf-test?pubsub_trigger=true",
)
cloudevent = event_conversion.background_event_to_cloudevent(req)

# Remove timestamps as they are generated on the fly.
del raw_pubsub_cloudevent_output["time"]
del cloudevent["time"]

assert cloudevent == raw_pubsub_cloudevent_output


def test_pubsub_emulator_request_to_cloudevent_without_topic_path(
raw_pubsub_request, raw_pubsub_cloudevent_output
):
req = flask.Request.from_values(json=raw_pubsub_request, path="/")
cloudevent = event_conversion.background_event_to_cloudevent(req)

# Remove timestamps as they are generated on the fly.
del raw_pubsub_cloudevent_output["time"]
del cloudevent["time"]

# Default to the service name, when the topic is not configured subscription's pushEndpoint.
raw_pubsub_cloudevent_output["source"] = "//pubsub.googleapis.com/"

assert cloudevent == raw_pubsub_cloudevent_output


def test_pubsub_emulator_request_with_invalid_message(
raw_pubsub_request, raw_pubsub_cloudevent_output
):
# Create an invalid message payload
raw_pubsub_request["message"] = None
req = flask.Request.from_values(json=raw_pubsub_request, path="/")

with pytest.raises(EventConversionException) as exc_info:
cloudevent = event_conversion.background_event_to_cloudevent(req)
assert "Failed to convert Pub/Sub payload to event" in exc_info.value.args[0]