Skip to content

added cloudevents to events-storage #4537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions run/events-storage/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Events for Cloud Run – GCS tutorial

This sample shows how to create a service that processes GCS events.
This sample shows how to create a service that processes GCS using [the CloudEvents SDK](https://github.com/cloudevents/sdk-python).

## Setup

Expand Down Expand Up @@ -57,13 +57,13 @@ gcloud alpha events triggers create $MY_GCS_TRIGGER \

## Test

Test your Cloud Run service by publishing a message to the topic:
Test your Cloud Run service by publishing a message to the topic:

```sh
gsutil defstorageclass set STANDARD gs://$MY_GCS_BUCKET
```

Observe the Cloud Run service printing upon receiving an event in
Observe the Cloud Run service printing upon receiving an event in
Cloud Logging:

```sh
Expand Down
34 changes: 23 additions & 11 deletions run/events-storage/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# [START run_events_gcs_handler]
import os

import cloudevents.exceptions as cloud_exceptions
from cloudevents.http import from_http

from flask import Flask, request


Expand All @@ -23,20 +26,29 @@

@app.route('/', methods=['POST'])
def index():
for field in required_fields:
if field not in request.headers:
errmsg = f'Bad Request: missing required header {field}'
print(errmsg)
return errmsg, 400

if 'Ce-Subject' not in request.headers:
errmsg = 'Bad Request: expected header Ce-Subject'
# Create CloudEvent from HTTP headers and body
try:
event = from_http(request.headers, request.get_data())

except cloud_exceptions.MissingRequiredFields as e:
print(f"cloudevents.exceptions.MissingRequiredFields: {e}")
return "Failed to find all required cloudevent fields. ", 400

except cloud_exceptions.InvalidStructuredJSON as e:
print(f"cloudevents.exceptions.InvalidStructuredJSON: {e}")
return "Could not deserialize the payload as JSON. ", 400

except cloud_exceptions.InvalidRequiredFields as e:
print(f"cloudevents.exceptions.InvalidRequiredFields: {e}")
return "Request contained invalid required cloudevent fields. ", 400

if 'subject' not in event:
errmsg = 'Bad Request: expected header ce-subject'
print(errmsg)
return errmsg, 400

ce_subject = request.headers.get('Ce-Subject')
print(f'GCS CloudEvent type: {ce_subject}')
return (f'GCS CloudEvent type: {ce_subject}', 200)
print(f"GCS CloudEvent type: {event['subject']}")
return (f"GCS CloudEvent type: {event['subject']}", 200)
# [END run_events_gcs_handler]


Expand Down
21 changes: 12 additions & 9 deletions run/events-storage/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import main


required_fields = ['Ce-Id', 'Ce-Source', 'Ce-Type', 'Ce-Specversion']

header_data = {field: str(uuid4()) for field in required_fields}
binary_headers = {
"ce-id": str(uuid4),
"ce-type": "com.pytest.sample.event",
"ce-source": "<my-test-source>",
"ce-specversion": "1.0"
}


@pytest.fixture
Expand All @@ -32,7 +35,7 @@ def client():


def test_endpoint(client, capsys):
test_headers = copy.copy(header_data)
test_headers = copy.copy(binary_headers)
test_headers['Ce-Subject'] = 'test-subject'

r = client.post('/', headers=test_headers)
Expand All @@ -43,20 +46,20 @@ def test_endpoint(client, capsys):


def test_missing_subject(client, capsys):
r = client.post('/', headers=header_data)
r = client.post('/', headers=binary_headers)
assert r.status_code == 400

out, _ = capsys.readouterr()
assert 'Bad Request: expected header Ce-Subject' in out
assert 'Bad Request: expected header ce-subject' in out


def test_missing_required_fields(client, capsys):
for field in required_fields:
test_headers = copy.copy(header_data)
for field in binary_headers:
test_headers = copy.copy(binary_headers)
test_headers.pop(field)

r = client.post('/', headers=test_headers)
assert r.status_code == 400

out, _ = capsys.readouterr()
assert f'Bad Request: missing required header {field}' in out
assert 'MissingRequiredFields' in out
1 change: 0 additions & 1 deletion run/events-storage/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pytest==6.0.1

1 change: 1 addition & 0 deletions run/events-storage/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
Flask==1.1.2
gunicorn==20.0.4
cloudevents==1.2.0