Skip to content

Commit 73d4742

Browse files
Cloudevents events pubsub (#4532)
* Added additional cloudevent error handling Signed-off-by: Curtis Mason <[email protected]> * Added cloudevents from github.com/cloudevents/sdk-python Signed-off-by: Curtis Mason <[email protected]> * cloudevents version bump 1.2.0 Signed-off-by: Curtis Mason <[email protected]> * lint fixes Signed-off-by: Curtis Mason <[email protected]> * README nit Signed-off-by: Curtis Mason <[email protected]> * cloudevents==1.2.0 Signed-off-by: Curtis Mason <[email protected]> * removed flask and cloudevents from req-test Signed-off-by: Curtis Mason <[email protected]> Co-authored-by: Leah E. Cole <[email protected]>
1 parent 8feb175 commit 73d4742

File tree

4 files changed

+42
-22
lines changed

4 files changed

+42
-22
lines changed

run/events-pubsub/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Events for Cloud Run – Pub/Sub tutorial
22

3-
This sample shows how to create a service that processes Pub/Sub events.
3+
This sample shows how to create a service that processes Pub/Sub events using
4+
[the CloudEvents SDK](https://github.com/cloudevents/sdk-python).
45

56
## Setup
67

run/events-pubsub/main.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@
1414

1515
# [START run_events_pubsub_server_setup]
1616
import base64
17+
1718
import os
1819

20+
import cloudevents.exceptions as cloud_exceptions
21+
from cloudevents.http import from_http
22+
1923
from flask import Flask, request
2024

2125

@@ -28,13 +32,24 @@
2832
# [START run_events_pubsub_handler]
2933
@app.route('/', methods=['POST'])
3034
def index():
31-
for field in required_fields:
32-
if field not in request.headers:
33-
errmsg = f'Bad Request: missing required header {field}'
34-
print(errmsg)
35-
return errmsg, 400
35+
# Create CloudEvent from HTTP headers and body
36+
try:
37+
event = from_http(request.headers, request.get_data())
38+
39+
except cloud_exceptions.MissingRequiredFields as e:
40+
print(f"cloudevents.exceptions.MissingRequiredFields: {e}")
41+
return "Failed to find all required cloudevent fields. ", 400
42+
43+
except cloud_exceptions.InvalidStructuredJSON as e:
44+
print(f"cloudevents.exceptions.InvalidStructuredJSON: {e}")
45+
return "Could not deserialize the payload as JSON. ", 400
46+
47+
except cloud_exceptions.InvalidRequiredFields as e:
48+
print(f"cloudevents.exceptions.InvalidRequiredFields: {e}")
49+
return "Request contained invalid required cloudevent fields. ", 400
50+
51+
envelope = event.data
3652

37-
envelope = request.get_json()
3853
if not envelope:
3954
msg = 'no Pub/Sub message received'
4055
print(f'error: {msg}')
@@ -51,8 +66,7 @@ def index():
5166
if isinstance(pubsub_message, dict) and 'data' in pubsub_message:
5267
name = base64.b64decode(pubsub_message['data']).decode('utf-8').strip()
5368

54-
ce_id = request.headers.get('Ce-Id')
55-
resp = f'Hello, {name}! ID: {ce_id}'
69+
resp = f"Hello, {name}! ID: {event['id']}"
5670
print(resp)
5771
return (resp, 200)
5872
# [END run_events_pubsub_handler]

run/events-pubsub/main_test.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import main
2323

2424

25-
required_fields = ['Ce-Id', 'Ce-Source', 'Ce-Type', 'Ce-Specversion']
26-
27-
header_data = {field: str(uuid4()) for field in required_fields}
25+
binary_headers = {
26+
"ce-id": str(uuid4),
27+
"ce-type": "com.pytest.sample.event",
28+
"ce-source": "<my-test-source>",
29+
"ce-specversion": "1.0"
30+
}
2831

2932

3033
@pytest.fixture
@@ -34,48 +37,49 @@ def client():
3437

3538

3639
def test_empty_payload(client):
37-
r = client.post('/', json='', headers=header_data)
40+
r = client.post('/', json='', headers=binary_headers)
3841
assert r.status_code == 400
3942

4043

4144
def test_invalid_payload(client):
42-
r = client.post('/', json={'nomessage': 'invalid'}, headers=header_data)
45+
r = client.post('/', json={'nomessage': 'invalid'}, headers=binary_headers)
4346
assert r.status_code == 400
4447

4548

4649
def test_invalid_mimetype(client):
47-
r = client.post('/', json="{ message: true }", headers=header_data)
50+
r = client.post('/', json="{ message: true }", headers=binary_headers)
4851
assert r.status_code == 400
4952

5053

5154
def test_minimally_valid_message(client, capsys):
52-
r = client.post('/', json={'message': True}, headers=header_data)
55+
r = client.post('/', json={'message': True}, headers=binary_headers)
5356
assert r.status_code == 200
5457

5558
out, _ = capsys.readouterr()
56-
ce_id = header_data['Ce-Id']
59+
ce_id = binary_headers['ce-id']
60+
5761
assert f'Hello, World! ID: {ce_id}' in out
5862

5963

6064
def test_populated_message(client, capsys):
6165
name = str(uuid4())
6266
data = base64.b64encode(name.encode()).decode()
6367

64-
r = client.post('/', json={'message': {'data': data}}, headers=header_data)
68+
r = client.post('/', json={'message': {'data': data}}, headers=binary_headers)
6569
assert r.status_code == 200
6670

6771
out, _ = capsys.readouterr()
68-
ce_id = header_data['Ce-Id']
72+
ce_id = binary_headers['ce-id']
6973
assert f'Hello, {name}! ID: {ce_id}' in out
7074

7175

7276
def test_missing_required_fields(client, capsys):
73-
for field in required_fields:
74-
test_headers = copy.copy(header_data)
77+
for field in binary_headers:
78+
test_headers = copy.copy(binary_headers)
7579
test_headers.pop(field)
7680

7781
r = client.post('/', headers=test_headers)
7882
assert r.status_code == 400
7983

8084
out, _ = capsys.readouterr()
81-
assert f'Bad Request: missing required header {field}' in out
85+
assert 'MissingRequiredFields' in out

run/events-pubsub/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
Flask==1.1.2
22
gunicorn==20.0.4
3+
cloudevents==1.2.0

0 commit comments

Comments
 (0)