Skip to content

feat: Support Pub/Sub push notifications format (adapt to CloudEvent) #238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,29 @@ Firestore event | DocumentEventData | --trigger-event providers/clo
> available in the `FirestoreEvent` class via the `Wildcards` property. This is subject to change,
> as it's inconsitent with other Functions Frameworks.

### Triggering an HTTP function with a Cloud Pub/Sub push subscription

HTTP functions can work as the endpoints for [Cloud Pub/Sub push
subscriptions](https://cloud.google.com/pubsub/docs/push). If your
function implements `ICloudEventFunction` or
`ICloudEventFunction<MessagePublishedData>`, the Functions Framework
will adapt the incoming HTTP request to present it to your function
as a CloudEvent, as if you had deployed via `--trigger-topic`. The
requests for push subscriptions do not contain topic data, but if
you create the push subcription with a URL of
`https://<your-function>/projects/<project-id>/topics/<topic-id>`
then the Functions Framework will infer the topic name from the path
of the HTTP request. If the topic name cannot be inferred
automatically, a topic name of
`projects/unknown-project!/topics/unknown-topic!` will be used in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we aren't consistent about what we use here across languages.

  • In Node.js we just drop it and make the CE subject //pubsub.googleapis.com/
  • In Ruby we set it to //pubsub.googleapis.com/UNKNOWN_PUBSUB_TOPIC

Also, I think it is might be possible to always extract the project name from the subscription in the payload.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we aren't consistent about what we use here across languages.

Mmm. The reason I chose this is to keep structure that will work in the absence of "real" data, but make it obvious that it's not real data. That may or may not be a good choice. Let's discuss this more before actually releasing.

Also, I think it is might be possible to always extract the project name from the subscription in the payload.

That assumes that the subscription is in the same project as the topic, which may not be the case. (I think cross-project PubSub subscriptions are a thing, with appropriate IAM. I could be wrong.) For the emulator that doesn't matter, but for the genuine PubSub push notifications, I think it's better to give either "obviously wrong" data or "user-specified data" (the URL path) rather than "looks plausible but may be wrong" data.

We should definitely talk about that part more. I'm going to merge this as-is, but file a new issue so that we don't forget about it before the next release.

the CloudEvent.

> Note: the Functions Framework code to adapt the incoming HTTP
> request works with the [Cloud Pub/Sub
> Emulator](https://cloud.google.com/pubsub/docs/emulator), but
> some information is not included in emulator push subscription
> requests.

### Deploying a function with a local project dependency

Real world functions are often part of a larger application which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

using CloudNative.CloudEvents;
using Google.Cloud.Functions.Framework.GcfEvents;
using Google.Events;
using Google.Events.Protobuf;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Google.Events.Protobuf.Cloud.Storage.V1;
Expand Down Expand Up @@ -131,6 +130,33 @@ public async Task PubSub_Attributes()
Assert.Empty(message.Attributes);
}

[Fact]
public async Task RawPubSub()
{
var data = await ConvertAndDeserialize<MessagePublishedData>("raw_pubsub.json");
Assert.Equal("projects/sample-project/subscriptions/sample-subscription", data.Subscription);
var message = data.Message!;
Assert.NotNull(message);
Assert.Equal(new Dictionary<string, string> { { "attr1", "value1" } }, message.Attributes);
Assert.Equal("Text message", message.TextData);
Assert.Equal("4102184774039362", message.MessageId);
Assert.Equal(new DateTimeOffset(2022, 2, 15, 11, 28, 32, 942, TimeSpan.Zero), message.PublishTime.ToDateTimeOffset());
Assert.Equal("orderxyz", message.OrderingKey);
}

[Fact]
public async Task EmulatorPubSub()
{
var data = await ConvertAndDeserialize<MessagePublishedData>("emulator_pubsub.json");
Assert.Equal("projects/emulator-project/subscriptions/test-subscription", data.Subscription);
var message = data.Message!;
Assert.NotNull(message);
Assert.Equal(new Dictionary<string, string> { { "attr1", "attr-value1" } }, message.Attributes);
Assert.Equal("Test message from emulator", message.TextData);
Assert.Equal("1", message.MessageId);
Assert.Null(message.PublishTime);
}

[Fact]
public async Task LegacyEvents()
{
Expand Down Expand Up @@ -215,9 +241,9 @@ public async Task FirebaseAuth_MetadataNamesAdjusted()
Assert.Equal(new System.DateTime(2020, 5, 29, 11, 00, 00), authData.Metadata.LastSignInTime.ToDateTime());
}

private static async Task<T> ConvertAndDeserialize<T>(string resourceName) where T : class
private static async Task<T> ConvertAndDeserialize<T>(string resourceName, string? path = null) where T : class
{
var context = GcfEventResources.CreateHttpContext(resourceName);
var context = GcfEventResources.CreateHttpContext(resourceName, path);
var formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(T))
?? throw new InvalidOperationException("No formatter available");
var cloudEvent = await GcfConverters.ConvertGcfEventToCloudEvent(context.Request, formatter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
using Google.Cloud.Functions.Framework.GcfEvents;
using Microsoft.AspNetCore.Http;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
Expand All @@ -37,6 +37,8 @@ public class GcfConvertersTest
[InlineData("firestore_simple.json", "google.cloud.firestore.document.v1.written", "//firestore.googleapis.com/projects/project-id/databases/(default)", "documents/gcf-test/2Vm2mI1d0wIaK2Waj5to")]
[InlineData("pubsub_text.json", "google.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", null)]
[InlineData("legacy_pubsub.json", "google.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", null)]
[InlineData("raw_pubsub.json", "google.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/unknown-project!/topics/unknown-topic!", null)]
[InlineData("emulator_pubsub.json", "google.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/unknown-project!/topics/unknown-topic!", null)]
[InlineData("firebase-db1.json", "google.firebase.database.ref.v1.written", "//firebasedatabase.googleapis.com/projects/_/locations/us-central1/instances/my-project-id", "refs/gcf-test/xyz")]
[InlineData("firebase-db2.json", "google.firebase.database.ref.v1.written", "//firebasedatabase.googleapis.com/projects/_/locations/europe-west1/instances/my-project-id", "refs/gcf-test/xyz")]
[InlineData("firebase-auth1.json", "google.firebase.auth.user.v1.created", "//firebaseauth.googleapis.com/projects/my-project-id", "users/UUpby3s4spZre6kHsgVSPetzQ8l2")]
Expand Down Expand Up @@ -109,6 +111,10 @@ public async Task MinimalValidEvent()
public Task InvalidRequest_UnableToDeserialize() =>
AssertInvalidRequest("{INVALIDJSON 'data':{}, 'context':{'eventId':'xyz', 'eventType': 'google.pubsub.topic.publish', 'resource':{'service': 'svc', 'name': 'resname'}}}");

[Fact]
public Task InvalidRequest_UnknownType() =>
AssertInvalidRequest("{'data':{}, 'context':{'eventId':'xyz', 'eventType': 'google.surprise', 'resource':{'service': 'svc', 'name': 'resname'}}}");

[Fact]
public Task InvalidRequest_NoData() =>
AssertInvalidRequest("{'context':{'eventId':'xyz', 'eventType': 'google.pubsub.topic.publish', 'resource':{'service': 'svc', 'name': 'resname'}}}");
Expand All @@ -125,6 +131,35 @@ public Task InvalidRequest_NoType() =>
public Task InvalidRequest_NoResourceName() =>
AssertInvalidRequest("{'data':{}, 'context':{'eventId':'xyz', 'eventType': 'google.pubsub.topic.publish', 'resource':{'service': 'svc'}}}");

// Minimal valid JSON for a raw Pub/Sub event, so all the subsequent invalid tests can be "this JSON with something removed" (or added)
[Fact]
public async Task MinimalValidEvent_RawPubSub()
{
string json = "{'message':{'messageId':'xyz'}, 'subscription':'projects/x/subscriptions/y'}";
var cloudEvent = await ConvertJson(json);
Assert.Equal("xyz", cloudEvent.Id);
}

[Fact]
public Task InvalidRequest_MissingMessageIdFromRawPubSub() =>
AssertInvalidRequest("{'message':{}, 'subscription':'projects/x/subscriptions/y'}");

[Fact]
public Task InvalidRequest_OnlySubscriptionFromRawPubSub() =>
AssertInvalidRequest("{'subscription':'projects/x/subscriptions/y'");

[Fact]
public Task InvalidRequest_OnlyMessageFromRawPubSub() =>
AssertInvalidRequest("{'message':{'messageId':'1'}}");

[Fact]
public Task InvalidRequest_RawPubSubAndContext() =>
AssertInvalidRequest("{'message':{'messageId':'xyz'}, 'subscription':'projects/x/subscriptions/y', 'context':{}}");

[Fact]
public Task InvalidRequest_RawPubSubAndData() =>
AssertInvalidRequest("{'message':{'messageId':'xyz'}, 'subscription':'projects/x/subscriptions/y', 'data':{}}");

[Theory]
[InlineData("firebase-analytics-no-app-id.json")]
[InlineData("firebase-analytics-no-event-name.json")]
Expand All @@ -134,6 +169,37 @@ public async Task InvalidRequest_FirebaseAnalytics(string resourceName)
await Assert.ThrowsAsync<GcfConverters.ConversionException>(() => GcfConverters.ConvertGcfEventToCloudEvent(context.Request, s_jsonFormatter));
}

[Theory]
[InlineData(null, GcfConverters.DefaultRawPubSubTopic)]
[InlineData("/not/a/matching/path", GcfConverters.DefaultRawPubSubTopic)]
[InlineData("/projects/abc/topics/bcd", "projects/abc/topics/bcd")]
public async Task RawPubSubTopic_NoPath(string? path, string expectedTopicResourceName)
{
var context = GcfEventResources.CreateHttpContext("raw_pubsub.json", path);
var cloudEvent = await GcfConverters.ConvertGcfEventToCloudEvent(context.Request, s_jsonFormatter);
Assert.Equal($"//pubsub.googleapis.com/{expectedTopicResourceName}", cloudEvent.Source?.ToString());
string expectedTopic = expectedTopicResourceName.Split('/').Last();
Assert.Equal(expectedTopic, (string) cloudEvent["topic"]!);
}

[Fact]
public async Task RawPubSub_SimpleProperties()
{
var context = GcfEventResources.CreateHttpContext("raw_pubsub.json");
var cloudEvent = await GcfConverters.ConvertGcfEventToCloudEvent(context.Request, s_jsonFormatter);
Assert.Equal(new DateTimeOffset(2022, 2, 15, 11, 28, 32, 942, TimeSpan.Zero), cloudEvent.Time);
Assert.Equal("4102184774039362", cloudEvent.Id);
}

[Fact]
public async Task EmulatorPubSub_SimpleProperties()
{
var context = GcfEventResources.CreateHttpContext("emulator_pubsub.json");
var cloudEvent = await GcfConverters.ConvertGcfEventToCloudEvent(context.Request, s_jsonFormatter);
Assert.Null(cloudEvent.Time);
Assert.Equal("1", cloudEvent.Id);
}

private static async Task AssertInvalidRequest(string json, string? contentType = null) =>
await Assert.ThrowsAsync<GcfConverters.ConversionException>(() => ConvertJson(json, contentType));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ namespace Google.Cloud.Functions.Framework.Tests.GcfEvents
/// </summary>
internal static class GcfEventResources
{
internal static HttpContext CreateHttpContext(string resourceName) =>
internal static HttpContext CreateHttpContext(string resourceName, string? path = null) =>
new DefaultHttpContext
{
Request =
{
Body = TestResourceHelper.LoadResource(typeof(GcfEventResources), resourceName),
ContentType = "application/json"
ContentType = "application/json",
Path = path is null ? PathString.Empty : (PathString) path
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"subscription": "projects\/emulator-project\/subscriptions\/test-subscription",
"message": {
"data": "VGVzdCBtZXNzYWdlIGZyb20gZW11bGF0b3I=",
"messageId": "1",
"attributes": {
"attr1": "attr-value1"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"message": {
"attributes": { "attr1":"value1" },
"data": "VGV4dCBtZXNzYWdl",
"messageId":"4102184774039362",
"message_id":"4102184774039362",
"orderingKey":"orderxyz",
"publishTime":"2022-02-15T11:28:32.942Z",
"publish_time":"2022-02-15T11:28:32.942Z"
},
"subscription":"projects/sample-project/subscriptions/sample-subscription"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,4 @@
<EmbeddedResource Include="**/*.json" Exclude="bin/**;obj/**" />
</ItemGroup>

<ItemGroup>
<None Remove="GcfEvents\pubsub_text_microsecond_precision.json" />
</ItemGroup>

</Project>
61 changes: 60 additions & 1 deletion src/Google.Cloud.Functions.Framework/GcfEvents/GcfConverters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private static class EventTypes
internal const string StorageObjectChanged = StorageObjectV1 + ".changed";
}

internal const string DefaultRawPubSubTopic = "projects/unknown-project!/topics/unknown-topic!";
private const string JsonContentType = "application/json";

private static readonly Dictionary<string, EventAdapter> s_eventTypeMapping = new Dictionary<string, EventAdapter>
Expand Down Expand Up @@ -142,12 +143,13 @@ private static async Task<Request> ParseRequest(HttpRequest request)
throw new ConversionException($"Error parsing GCF event: {e.Message}", e);
}

PubSubEventAdapter.NormalizeRawRequest(parsedRequest, request.Path.Value);
parsedRequest.NormalizeContext();
if (parsedRequest.Data is null ||
string.IsNullOrEmpty(parsedRequest.Context.Id) ||
string.IsNullOrEmpty(parsedRequest.Context.Type))
{
throw new ConversionException("Event is malformed; does not contain a payload, or the event ID is missing.");
throw new ConversionException("Event is malformed; does not contain a payload, or the event ID or type is missing.");
}
return parsedRequest;
}
Expand Down Expand Up @@ -292,6 +294,63 @@ protected override void MaybeReshapeData(Request request)
request.Data["publishTime"] = timestamp.UtcDateTime.ToString(formatString, CultureInfo.InvariantCulture);
}
request.Data = new Dictionary<string, object> { { "message", request.Data } };

// The subscription is not provided in the legacy format, but *is* provided in
// the raw Pub/Sub push notification (including in the emulator) so we should use it if we've got it.
if (request.RawPubSubSubscription is string subscription)
{
request.Data["subscription"] = subscription;
}
}

/// <summary>
/// Normalizes a raw Pub/Sub push notification (from either the emulator
/// or the real Pub/Sub service) into the existing legacy format.
/// </summary>
/// <param name="request">The incoming request body, parsed as a <see cref="Request"/> object</param>
/// <param name="path">The HTTP request path, from which the topic name will be extracted.</param>
internal static void NormalizeRawRequest(Request request, string path)
{
// Non-raw-Pub/Sub path: just a no-op
if (request.RawPubSubMessage is null && request.RawPubSubSubscription is null)
{
return;
}
if (request.RawPubSubMessage is null || request.RawPubSubSubscription is null)
{
throw new ConversionException("Request is malformed; it must contain both 'message' and 'subscription' properties or neither.");
}
if (request.Data is object ||
request.Domain is object ||
request.Context is object ||
request.EventId is object ||
request.EventType is object ||
request.Params is object ||
request.Timestamp is object)
{
throw new ConversionException("Request is malformed; raw Pub/Sub request must contain only 'message' and 'subscription' properties.");
}
if (!request.RawPubSubMessage.TryGetValue("messageId", out var messageIdObj) || !(messageIdObj is JsonElement messageIdElement) ||
messageIdElement.ValueKind != JsonValueKind.String)
{
throw new ConversionException("Request is malformed; raw Pub/Sub message must contain a 'message.messageId' string property.");
}
request.EventId = messageIdElement.GetString();
request.EventType = "providers/cloud.pubsub/eventTypes/topic.publish";
// Skip the leading / in the path
path = path.Length == 0 ? "" : path.Substring(1);
var topicPathMatch = PubSubResourcePattern.Match(path);
request.Resource = topicPathMatch.Success ? path : DefaultRawPubSubTopic;
request.Data = request.RawPubSubMessage;
if (request.RawPubSubMessage.TryGetValue("publishTime", out var publishTime) &&
publishTime is JsonElement publishTimeElement &&
publishTimeElement.ValueKind == JsonValueKind.String &&
DateTimeOffset.TryParseExact(publishTimeElement.GetString(), "yyyy-MM-dd'T'HH:mm:ss.FFFFFF'Z'", CultureInfo.InvariantCulture,
DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal,
out var timestamp))
{
request.Timestamp = timestamp;
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/Google.Cloud.Functions.Framework/GcfEvents/Request.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ internal sealed class Request
[JsonPropertyName("domain")]
public string? Domain { get; set; }

/// <summary>
/// Raw PubSub only: the subscription triggering the request.
/// </summary>
[JsonPropertyName("subscription")]
public string? RawPubSubSubscription { get; set; }

/// <summary>
/// Raw PubSub only: the PubSub message.
/// </summary>
[JsonPropertyName("message")]
public Dictionary<string, object>? RawPubSubMessage { get; set; }

public Request()
{
Context = null!;
Expand Down