Skip to content

Cloudevents events pubsub #4532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion run/events-pubsub/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Events for Cloud Run – Pub/Sub tutorial

This sample shows how to create a service that processes Pub/Sub events.
This sample shows how to create a service that processes Pub/Sub events using
[the CloudEvents SDK](https://github.com/cloudevents/sdk-python).

## Setup

Expand Down
30 changes: 22 additions & 8 deletions run/events-pubsub/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

# [START run_events_pubsub_server_setup]
import base64

import os

import cloudevents.exceptions as cloud_exceptions
from cloudevents.http import from_http

from flask import Flask, request


Expand All @@ -28,13 +32,24 @@
# [START run_events_pubsub_handler]
@app.route('/', methods=['POST'])
def index():
for field in required_fields:
if field not in request.headers:
errmsg = f'Bad Request: missing required header {field}'
print(errmsg)
return errmsg, 400
# Create CloudEvent from HTTP headers and body
try:
event = from_http(request.headers, request.get_data())

except cloud_exceptions.MissingRequiredFields as e:
print(f"cloudevents.exceptions.MissingRequiredFields: {e}")
return "Failed to find all required cloudevent fields. ", 400

except cloud_exceptions.InvalidStructuredJSON as e:
print(f"cloudevents.exceptions.InvalidStructuredJSON: {e}")
return "Could not deserialize the payload as JSON. ", 400

except cloud_exceptions.InvalidRequiredFields as e:
print(f"cloudevents.exceptions.InvalidRequiredFields: {e}")
return "Request contained invalid required cloudevent fields. ", 400

envelope = event.data

envelope = request.get_json()
if not envelope:
msg = 'no Pub/Sub message received'
print(f'error: {msg}')
Expand All @@ -51,8 +66,7 @@ def index():
if isinstance(pubsub_message, dict) and 'data' in pubsub_message:
name = base64.b64decode(pubsub_message['data']).decode('utf-8').strip()

ce_id = request.headers.get('Ce-Id')
resp = f'Hello, {name}! ID: {ce_id}'
resp = f"Hello, {name}! ID: {event['id']}"
print(resp)
return (resp, 200)
# [END run_events_pubsub_handler]
Expand Down
30 changes: 17 additions & 13 deletions run/events-pubsub/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import main


required_fields = ['Ce-Id', 'Ce-Source', 'Ce-Type', 'Ce-Specversion']

header_data = {field: str(uuid4()) for field in required_fields}
binary_headers = {
"ce-id": str(uuid4),
"ce-type": "com.pytest.sample.event",
"ce-source": "<my-test-source>",
"ce-specversion": "1.0"
}


@pytest.fixture
Expand All @@ -34,48 +37,49 @@ def client():


def test_empty_payload(client):
r = client.post('/', json='', headers=header_data)
r = client.post('/', json='', headers=binary_headers)
assert r.status_code == 400


def test_invalid_payload(client):
r = client.post('/', json={'nomessage': 'invalid'}, headers=header_data)
r = client.post('/', json={'nomessage': 'invalid'}, headers=binary_headers)
assert r.status_code == 400


def test_invalid_mimetype(client):
r = client.post('/', json="{ message: true }", headers=header_data)
r = client.post('/', json="{ message: true }", headers=binary_headers)
assert r.status_code == 400


def test_minimally_valid_message(client, capsys):
r = client.post('/', json={'message': True}, headers=header_data)
r = client.post('/', json={'message': True}, headers=binary_headers)
assert r.status_code == 200

out, _ = capsys.readouterr()
ce_id = header_data['Ce-Id']
ce_id = binary_headers['ce-id']

assert f'Hello, World! ID: {ce_id}' in out


def test_populated_message(client, capsys):
name = str(uuid4())
data = base64.b64encode(name.encode()).decode()

r = client.post('/', json={'message': {'data': data}}, headers=header_data)
r = client.post('/', json={'message': {'data': data}}, headers=binary_headers)
assert r.status_code == 200

out, _ = capsys.readouterr()
ce_id = header_data['Ce-Id']
ce_id = binary_headers['ce-id']
assert f'Hello, {name}! ID: {ce_id}' in out


def test_missing_required_fields(client, capsys):
for field in required_fields:
test_headers = copy.copy(header_data)
for field in binary_headers:
test_headers = copy.copy(binary_headers)
test_headers.pop(field)

r = client.post('/', headers=test_headers)
assert r.status_code == 400

out, _ = capsys.readouterr()
assert f'Bad Request: missing required header {field}' in out
assert 'MissingRequiredFields' in out
2 changes: 2 additions & 0 deletions run/events-pubsub/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pytest==6.0.1
cloudevents==1.2.0
Flask==1.1.2
1 change: 1 addition & 0 deletions run/events-pubsub/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
Flask==1.1.2
gunicorn==20.0.4
cloudevents==1.2.0