Skip to content

Commit b926043

Browse files
fix local devlopment with the Pub/Sub emulator
1 parent 6277270 commit b926043

File tree

5 files changed

+207
-15
lines changed

5 files changed

+207
-15
lines changed

src/functions_framework/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def view_func(path):
126126
function(data, context)
127127
else:
128128
# This is a regular CloudEvent
129-
event_data = request.get_json()
129+
event_data = event_conversion.marshal_background_event_data(request)
130130
if not event_data:
131131
flask.abort(400)
132132
event_object = BackgroundEvent(**event_data)

src/functions_framework/event_conversion.py

+60-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
# limitations under the License.
1414
import re
1515

16-
from typing import Tuple
16+
from datetime import datetime
17+
from typing import Optional, Tuple
1718

1819
from cloudevents.http import CloudEvent
1920

@@ -55,6 +56,12 @@
5556
_PUBSUB_CE_SERVICE = "pubsub.googleapis.com"
5657
_STORAGE_CE_SERVICE = "storage.googleapis.com"
5758

59+
# Raw pubsub types
60+
_PUBSUB_EVENT_TYPE = 'google.pubsub.topic.publish'
61+
_PUBSUB_MESSAGE_TYPE = 'type.googleapis.com/google.pubsub.v1.PubsubMessage'
62+
63+
_PUBSUB_TOPIC_REQUEST_PATH = re.compile(r"projects\/[^/?]+\/topics\/[^/?]+")
64+
5865
# Maps background event services to their equivalent CloudEvent services.
5966
_SERVICE_BACKGROUND_TO_CE = {
6067
"providers/cloud.firestore/": _FIRESTORE_CE_SERVICE,
@@ -90,7 +97,7 @@
9097

9198
def background_event_to_cloudevent(request) -> CloudEvent:
9299
"""Converts a background event represented by the given HTTP request into a CloudEvent. """
93-
event_data = request.get_json()
100+
event_data = marshal_background_event_data(request)
94101
if not event_data:
95102
raise EventConversionException("Failed to parse JSON")
96103

@@ -109,6 +116,10 @@ def background_event_to_cloudevent(request) -> CloudEvent:
109116
# Handle Pub/Sub events.
110117
if service == _PUBSUB_CE_SERVICE:
111118
data = {"message": data}
119+
# It is possible to configure a Pub/Sub subscription to push directly to this function
120+
# without passing the topic name in the URL path.
121+
if resource is None:
122+
resource = ""
112123

113124
# Handle Firebase Auth events.
114125
if service == _FIREBASE_AUTH_CE_SERVICE:
@@ -168,3 +179,50 @@ def _split_resource(context: Context) -> Tuple[str, str, str]:
168179
raise EventConversionException("Resource regex did not match")
169180

170181
return service, match.group(1), match.group(2)
182+
183+
184+
def marshal_background_event_data(request):
185+
"""Marshal the request body of a raw Pub/Sub HTTP request into the schema that is expected of
186+
a background event"""
187+
request_data = request.get_json()
188+
if not _is_raw_pubsub_payload(request_data):
189+
# If this in not a raw Pub/Sub request, return the unaltered request data.
190+
return request_data
191+
192+
return {
193+
"context": {
194+
"eventId": request_data["message"]["messageId"],
195+
"timestamp": datetime.utcnow().isoformat() + "Z",
196+
"eventType": _PUBSUB_EVENT_TYPE,
197+
"resource": {
198+
"service": _PUBSUB_CE_SERVICE,
199+
"type": _PUBSUB_MESSAGE_TYPE,
200+
"name": _parse_pubsub_topic(request.path),
201+
},
202+
},
203+
"data": {
204+
'@type': _PUBSUB_MESSAGE_TYPE,
205+
"data": request_data["message"]["data"],
206+
"attributes": request_data["message"]["attributes"],
207+
}
208+
}
209+
210+
211+
def _is_raw_pubsub_payload(request_data) -> bool:
212+
"""Does the given request body match the schema of a unmarshalled Pub/Sub request"""
213+
return (
214+
request_data is not None and
215+
"context" not in request_data and
216+
"subscription" in request_data and
217+
"message" in request_data and
218+
"data" in request_data["message"] and
219+
"messageId" in request_data["message"]
220+
)
221+
222+
223+
def _parse_pubsub_topic(request_path) -> Optional[str]:
224+
match = _PUBSUB_TOPIC_REQUEST_PATH.search(request_path)
225+
if match:
226+
return match.group(0)
227+
else:
228+
return None

tests/test_convert.py

+115-10
Original file line numberDiff line numberDiff line change
@@ -65,23 +65,67 @@
6565

6666
BACKGROUND_RESOURCE_STRING = "projects/_/buckets/some-bucket/objects/folder/Test.cs"
6767

68+
PUBSUB_CLOUD_EVENT = {
69+
"specversion": "1.0",
70+
"id": "1215011316659232",
71+
"source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test",
72+
"time": "2020-05-18T12:13:19Z",
73+
"type": "google.cloud.pubsub.topic.v1.messagePublished",
74+
"datacontenttype": "application/json",
75+
"data": {
76+
"message": {
77+
"data": "10",
78+
},
79+
},
80+
}
81+
6882

6983
@pytest.fixture
7084
def pubsub_cloudevent_output():
71-
event = {
72-
"specversion": "1.0",
73-
"id": "1215011316659232",
74-
"source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test",
75-
"time": "2020-05-18T12:13:19Z",
76-
"type": "google.cloud.pubsub.topic.v1.messagePublished",
77-
"datacontenttype": "application/json",
85+
return from_json(json.dumps(PUBSUB_CLOUD_EVENT))
86+
87+
@pytest.fixture
88+
def raw_pubsub_request():
89+
return {
90+
"subscription": "projects/sample-project/subscriptions/gcf-test-sub",
91+
"message": {
92+
"data": "eyJmb28iOiJiYXIifQ==",
93+
"messageId": "1215011316659232",
94+
"attributes": {
95+
"test": "123"
96+
}
97+
}
98+
}
99+
100+
@pytest.fixture
101+
def marshalled_pubsub_request():
102+
return {
78103
"data": {
79-
"message": {
80-
"data": "10",
81-
},
104+
"@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage",
105+
"data": "eyJmb28iOiJiYXIifQ==",
106+
"attributes": {
107+
"test": "123"
108+
}
82109
},
110+
"context": {
111+
"eventId": "1215011316659232",
112+
"eventType": "google.pubsub.topic.publish",
113+
"resource": {
114+
"name": "projects/sample-project/topics/gcf-test",
115+
"service": "pubsub.googleapis.com",
116+
"type": "type.googleapis.com/google.pubsub.v1.PubsubMessage"
117+
},
118+
"timestamp": "2021-04-17T07:21:18.249Z",
119+
}
83120
}
84121

122+
@pytest.fixture
123+
def raw_pubsub_cloudevent_output(marshalled_pubsub_request):
124+
event = PUBSUB_CLOUD_EVENT.copy()
125+
# the data payload is more complex for the raw pubsub request
126+
event["data"] = {
127+
"message": marshalled_pubsub_request["data"]
128+
}
85129
return from_json(json.dumps(event))
86130

87131

@@ -212,3 +256,64 @@ def test_split_resource_no_resource_regex_match():
212256
with pytest.raises(EventConversionException) as exc_info:
213257
event_conversion._split_resource(context)
214258
assert "Resource regex did not match" in exc_info.value.args[0]
259+
260+
261+
def test_marshal_background_event_data_without_topic_in_path(
262+
raw_pubsub_request, marshalled_pubsub_request
263+
):
264+
req = flask.Request.from_values(json=raw_pubsub_request, path="/myfunc/")
265+
payload = event_conversion.marshal_background_event_data(req)
266+
267+
# Remove timestamps as they get generates on the fly
268+
del marshalled_pubsub_request["context"]["timestamp"]
269+
del payload["context"]["timestamp"]
270+
271+
# Resource name is set to None when it cannot be parsed from the request path
272+
marshalled_pubsub_request["context"]["resource"]["name"] = None
273+
274+
assert payload == marshalled_pubsub_request
275+
276+
def test_marshal_background_event_data_with_topic_path(
277+
raw_pubsub_request, marshalled_pubsub_request
278+
):
279+
req = flask.Request.from_values(
280+
json=raw_pubsub_request, path="x/projects/sample-project/topics/gcf-test?pubsub_trigger=true"
281+
)
282+
payload = event_conversion.marshal_background_event_data(req)
283+
284+
# Remove timestamps as they are generated on the fly.
285+
del marshalled_pubsub_request["context"]["timestamp"]
286+
del payload["context"]["timestamp"]
287+
288+
assert payload == marshalled_pubsub_request
289+
290+
def test_pubsub_emulator_request_to_cloudevent(raw_pubsub_request, raw_pubsub_cloudevent_output):
291+
req = flask.Request.from_values(
292+
json=raw_pubsub_request,
293+
path="x/projects/sample-project/topics/gcf-test?pubsub_trigger=true"
294+
)
295+
cloudevent = event_conversion.background_event_to_cloudevent(req)
296+
297+
# Remove timestamps as they are generated on the fly.
298+
del raw_pubsub_cloudevent_output['time']
299+
del cloudevent['time']
300+
301+
assert cloudevent == raw_pubsub_cloudevent_output
302+
303+
304+
def test_pubsub_emulator_request_to_cloudevent_without_topic_path(
305+
raw_pubsub_request, raw_pubsub_cloudevent_output
306+
):
307+
req = flask.Request.from_values(
308+
json=raw_pubsub_request, path="/"
309+
)
310+
cloudevent = event_conversion.background_event_to_cloudevent(req)
311+
312+
# Remove timestamps as they are generated on the fly.
313+
del raw_pubsub_cloudevent_output['time']
314+
del cloudevent['time']
315+
316+
# Default to the service name, when the topic is not configured subscription's pushEndpoint.
317+
raw_pubsub_cloudevent_output['source'] = "//pubsub.googleapis.com/"
318+
319+
assert cloudevent == raw_pubsub_cloudevent_output

tests/test_functions.py

+28
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ def background_json(tmpdir):
4848
"data": {"filename": str(tmpdir / "filename.txt"), "value": "some-value"},
4949
}
5050

51+
@pytest.fixture
52+
def pubsub_emulator_request_payload(tmpdir):
53+
return {
54+
"subscription": "projects/FOO/subscriptions/BAR_SUB",
55+
"message": {
56+
"data": 'eyJmb28iOiJiYXIifQ==',
57+
"messageId": "1",
58+
"attributes": {
59+
"filename": str(tmpdir / "filename.txt"),
60+
"value": "some-value"
61+
}
62+
}
63+
}
5164

5265
def test_http_function_executes_success():
5366
source = TEST_FUNCTIONS_DIR / "http_trigger" / "main.py"
@@ -242,6 +255,21 @@ def test_pubsub_payload(background_json):
242255
background_json["data"]["value"]
243256
)
244257

258+
def test_pubsub_emulator_payload(pubsub_emulator_request_payload):
259+
source = TEST_FUNCTIONS_DIR / "background_trigger" / "main.py"
260+
target = "function"
261+
262+
client = create_app(target, source, "event").test_client()
263+
264+
resp = client.post("/", json=pubsub_emulator_request_payload)
265+
266+
assert resp.status_code == 200
267+
assert resp.data == b"OK"
268+
269+
with open(pubsub_emulator_request_payload["message"]["attributes"]["filename"]) as f:
270+
assert f.read() == '{{"entryPoint": "function", "value": "{}"}}'.format(
271+
pubsub_emulator_request_payload["message"]["attributes"]["value"]
272+
)
245273

246274
def test_background_function_no_data(background_json):
247275
source = TEST_FUNCTIONS_DIR / "background_trigger" / "main.py"

tests/test_functions/background_trigger/main.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ def function(
2929
data dictionary.
3030
context (google.cloud.functions.Context): The Cloud Functions event context.
3131
"""
32-
filename = event["filename"]
33-
value = event["value"]
32+
attributes = event.get("attributes", {})
33+
filename = event.get("filename", attributes.get("filename"))
34+
value = event.get("value", attributes.get("value"))
3435
f = open(filename, "w")
3536
f.write('{{"entryPoint": "function", "value": "{}"}}'.format(value))
3637
f.close()

0 commit comments

Comments
 (0)