Skip to content

Commit af9f09d

Browse files
committed
feat: Support Pub/Sub push notifications format (adapt to CloudEvent)
Fixes #234
1 parent 4f3476f commit af9f09d

File tree

9 files changed

+216
-11
lines changed

9 files changed

+216
-11
lines changed

docs/deployment.md

+23
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,29 @@ Firestore event | DocumentEventData | --trigger-event providers/clo
9595
> available in the `FirestoreEvent` class via the `Wildcards` property. This is subject to change,
9696
> as it's inconsitent with other Functions Frameworks.
9797
98+
### Triggering an HTTP function with a Cloud Pub/Sub push subscription
99+
100+
HTTP functions can work as the endpoints for [Cloud Pub/Sub push
101+
subscriptions](https://cloud.google.com/pubsub/docs/push). If your
102+
function implements `ICloudEventFunction` or
103+
`ICloudEventFunction<MessagePublishedData>`, the Functions Framework
104+
will adapt the incoming HTTP request to present it to your function
105+
as a CloudEvent, as if you had deployed via `--trigger-topic`. The
106+
requests for push subscriptions do not contain topic data, but if
107+
you create the push subcription with a URL of
108+
`https://<your-function>/projects/<project-id>/topics/<topic-id>`
109+
then the Functions Framework will infer the topic name from the path
110+
of the HTTP request. If the topic name cannot be inferred
111+
automatically, a topic name of
112+
`projects/unknown-project!/topics/unknown-topic!` will be used in
113+
the CloudEvent.
114+
115+
> Note: the Functions Framework code to adapt the incoming HTTP
116+
> request works with the [Cloud Pub/Sub
117+
> Emulator](https://cloud.google.com/pubsub/docs/emulator), but
118+
> some information is not included in emulator push subscription
119+
> requests.
120+
98121
### Deploying a function with a local project dependency
99122

100123
Real world functions are often part of a larger application which

src/Google.Cloud.Functions.Framework.Tests/GcfEvents/EventDeserializationTest.cs

+29-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
using CloudNative.CloudEvents;
1616
using Google.Cloud.Functions.Framework.GcfEvents;
17-
using Google.Events;
1817
using Google.Events.Protobuf;
1918
using Google.Events.Protobuf.Cloud.PubSub.V1;
2019
using Google.Events.Protobuf.Cloud.Storage.V1;
@@ -131,6 +130,33 @@ public async Task PubSub_Attributes()
131130
Assert.Empty(message.Attributes);
132131
}
133132

133+
[Fact]
134+
public async Task RawPubSub()
135+
{
136+
var data = await ConvertAndDeserialize<MessagePublishedData>("raw_pubsub.json");
137+
Assert.Equal("projects/sample-project/subscriptions/sample-subscription", data.Subscription);
138+
var message = data.Message!;
139+
Assert.NotNull(message);
140+
Assert.Equal(new Dictionary<string, string> { { "attr1", "value1" } }, message.Attributes);
141+
Assert.Equal("Text message", message.TextData);
142+
Assert.Equal("4102184774039362", message.MessageId);
143+
Assert.Equal(new DateTimeOffset(2022, 2, 15, 11, 28, 32, 942, TimeSpan.Zero), message.PublishTime.ToDateTimeOffset());
144+
Assert.Equal("orderxyz", message.OrderingKey);
145+
}
146+
147+
[Fact]
148+
public async Task EmulatorPubSub()
149+
{
150+
var data = await ConvertAndDeserialize<MessagePublishedData>("emulator_pubsub.json");
151+
Assert.Equal("projects/emulator-project/subscriptions/test-subscription", data.Subscription);
152+
var message = data.Message!;
153+
Assert.NotNull(message);
154+
Assert.Equal(new Dictionary<string, string> { { "attr1", "attr-value1" } }, message.Attributes);
155+
Assert.Equal("Test message from emulator", message.TextData);
156+
Assert.Equal("1", message.MessageId);
157+
Assert.Null(message.PublishTime);
158+
}
159+
134160
[Fact]
135161
public async Task LegacyEvents()
136162
{
@@ -215,9 +241,9 @@ public async Task FirebaseAuth_MetadataNamesAdjusted()
215241
Assert.Equal(new System.DateTime(2020, 5, 29, 11, 00, 00), authData.Metadata.LastSignInTime.ToDateTime());
216242
}
217243

218-
private static async Task<T> ConvertAndDeserialize<T>(string resourceName) where T : class
244+
private static async Task<T> ConvertAndDeserialize<T>(string resourceName, string? path = null) where T : class
219245
{
220-
var context = GcfEventResources.CreateHttpContext(resourceName);
246+
var context = GcfEventResources.CreateHttpContext(resourceName, path);
221247
var formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(T))
222248
?? throw new InvalidOperationException("No formatter available");
223249
var cloudEvent = await GcfConverters.ConvertGcfEventToCloudEvent(context.Request, formatter);

src/Google.Cloud.Functions.Framework.Tests/GcfEvents/GcfConvertersTest.cs

+67-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
using Google.Cloud.Functions.Framework.GcfEvents;
1818
using Microsoft.AspNetCore.Http;
1919
using System;
20-
using System.Collections.Generic;
2120
using System.IO;
21+
using System.Linq;
2222
using System.Text;
2323
using System.Text.Json;
2424
using System.Threading.Tasks;
@@ -37,6 +37,8 @@ public class GcfConvertersTest
3737
[InlineData("firestore_simple.json", "google.cloud.firestore.document.v1.written", "//firestore.googleapis.com/projects/project-id/databases/(default)", "documents/gcf-test/2Vm2mI1d0wIaK2Waj5to")]
3838
[InlineData("pubsub_text.json", "google.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", null)]
3939
[InlineData("legacy_pubsub.json", "google.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test", null)]
40+
[InlineData("raw_pubsub.json", "google.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/unknown-project!/topics/unknown-topic!", null)]
41+
[InlineData("emulator_pubsub.json", "google.cloud.pubsub.topic.v1.messagePublished", "//pubsub.googleapis.com/projects/unknown-project!/topics/unknown-topic!", null)]
4042
[InlineData("firebase-db1.json", "google.firebase.database.ref.v1.written", "//firebasedatabase.googleapis.com/projects/_/locations/us-central1/instances/my-project-id", "refs/gcf-test/xyz")]
4143
[InlineData("firebase-db2.json", "google.firebase.database.ref.v1.written", "//firebasedatabase.googleapis.com/projects/_/locations/europe-west1/instances/my-project-id", "refs/gcf-test/xyz")]
4244
[InlineData("firebase-auth1.json", "google.firebase.auth.user.v1.created", "//firebaseauth.googleapis.com/projects/my-project-id", "users/UUpby3s4spZre6kHsgVSPetzQ8l2")]
@@ -109,6 +111,10 @@ public async Task MinimalValidEvent()
109111
public Task InvalidRequest_UnableToDeserialize() =>
110112
AssertInvalidRequest("{INVALIDJSON 'data':{}, 'context':{'eventId':'xyz', 'eventType': 'google.pubsub.topic.publish', 'resource':{'service': 'svc', 'name': 'resname'}}}");
111113

114+
[Fact]
115+
public Task InvalidRequest_UnknownType() =>
116+
AssertInvalidRequest("{'data':{}, 'context':{'eventId':'xyz', 'eventType': 'google.surprise', 'resource':{'service': 'svc', 'name': 'resname'}}}");
117+
112118
[Fact]
113119
public Task InvalidRequest_NoData() =>
114120
AssertInvalidRequest("{'context':{'eventId':'xyz', 'eventType': 'google.pubsub.topic.publish', 'resource':{'service': 'svc', 'name': 'resname'}}}");
@@ -125,6 +131,35 @@ public Task InvalidRequest_NoType() =>
125131
public Task InvalidRequest_NoResourceName() =>
126132
AssertInvalidRequest("{'data':{}, 'context':{'eventId':'xyz', 'eventType': 'google.pubsub.topic.publish', 'resource':{'service': 'svc'}}}");
127133

134+
// Minimal valid JSON for a raw Pub/Sub event, so all the subsequent invalid tests can be "this JSON with something removed" (or added)
135+
[Fact]
136+
public async Task MinimalValidEvent_RawPubSub()
137+
{
138+
string json = "{'message':{'messageId':'xyz'}, 'subscription':'projects/x/subscriptions/y'}";
139+
var cloudEvent = await ConvertJson(json);
140+
Assert.Equal("xyz", cloudEvent.Id);
141+
}
142+
143+
[Fact]
144+
public Task InvalidRequest_MissingMessageIdFromRawPubSub() =>
145+
AssertInvalidRequest("{'message':{}, 'subscription':'projects/x/subscriptions/y'}");
146+
147+
[Fact]
148+
public Task InvalidRequest_OnlySubscriptionFromRawPubSub() =>
149+
AssertInvalidRequest("{'subscription':'projects/x/subscriptions/y'");
150+
151+
[Fact]
152+
public Task InvalidRequest_OnlyMessageFromRawPubSub() =>
153+
AssertInvalidRequest("{'message':{'messageId':'1'}}");
154+
155+
[Fact]
156+
public Task InvalidRequest_RawPubSubAndContext() =>
157+
AssertInvalidRequest("{'message':{'messageId':'xyz'}, 'subscription':'projects/x/subscriptions/y', 'context':{}}");
158+
159+
[Fact]
160+
public Task InvalidRequest_RawPubSubAndData() =>
161+
AssertInvalidRequest("{'message':{'messageId':'xyz'}, 'subscription':'projects/x/subscriptions/y', 'data':{}}");
162+
128163
[Theory]
129164
[InlineData("firebase-analytics-no-app-id.json")]
130165
[InlineData("firebase-analytics-no-event-name.json")]
@@ -134,6 +169,37 @@ public async Task InvalidRequest_FirebaseAnalytics(string resourceName)
134169
await Assert.ThrowsAsync<GcfConverters.ConversionException>(() => GcfConverters.ConvertGcfEventToCloudEvent(context.Request, s_jsonFormatter));
135170
}
136171

172+
[Theory]
173+
[InlineData(null, GcfConverters.DefaultRawPubSubTopic)]
174+
[InlineData("/not/a/matching/path", GcfConverters.DefaultRawPubSubTopic)]
175+
[InlineData("/projects/abc/topics/bcd", "projects/abc/topics/bcd")]
176+
public async Task RawPubSubTopic_NoPath(string? path, string expectedTopicResourceName)
177+
{
178+
var context = GcfEventResources.CreateHttpContext("raw_pubsub.json", path);
179+
var cloudEvent = await GcfConverters.ConvertGcfEventToCloudEvent(context.Request, s_jsonFormatter);
180+
Assert.Equal($"//pubsub.googleapis.com/{expectedTopicResourceName}", cloudEvent.Source?.ToString());
181+
string expectedTopic = expectedTopicResourceName.Split('/').Last();
182+
Assert.Equal(expectedTopic, (string) cloudEvent["topic"]!);
183+
}
184+
185+
[Fact]
186+
public async Task RawPubSub_SimpleProperties()
187+
{
188+
var context = GcfEventResources.CreateHttpContext("raw_pubsub.json");
189+
var cloudEvent = await GcfConverters.ConvertGcfEventToCloudEvent(context.Request, s_jsonFormatter);
190+
Assert.Equal(new DateTimeOffset(2022, 2, 15, 11, 28, 32, 942, TimeSpan.Zero), cloudEvent.Time);
191+
Assert.Equal("4102184774039362", cloudEvent.Id);
192+
}
193+
194+
[Fact]
195+
public async Task EmulatorPubSub_SimpleProperties()
196+
{
197+
var context = GcfEventResources.CreateHttpContext("emulator_pubsub.json");
198+
var cloudEvent = await GcfConverters.ConvertGcfEventToCloudEvent(context.Request, s_jsonFormatter);
199+
Assert.Null(cloudEvent.Time);
200+
Assert.Equal("1", cloudEvent.Id);
201+
}
202+
137203
private static async Task AssertInvalidRequest(string json, string? contentType = null) =>
138204
await Assert.ThrowsAsync<GcfConverters.ConversionException>(() => ConvertJson(json, contentType));
139205

src/Google.Cloud.Functions.Framework.Tests/GcfEvents/GcfEventResources.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ namespace Google.Cloud.Functions.Framework.Tests.GcfEvents
2121
/// </summary>
2222
internal static class GcfEventResources
2323
{
24-
internal static HttpContext CreateHttpContext(string resourceName) =>
24+
internal static HttpContext CreateHttpContext(string resourceName, string? path = null) =>
2525
new DefaultHttpContext
2626
{
2727
Request =
2828
{
2929
Body = TestResourceHelper.LoadResource(typeof(GcfEventResources), resourceName),
30-
ContentType = "application/json"
30+
ContentType = "application/json",
31+
Path = path is null ? PathString.Empty : (PathString) path
3132
}
3233
};
3334
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"subscription": "projects\/emulator-project\/subscriptions\/test-subscription",
3+
"message": {
4+
"data": "VGVzdCBtZXNzYWdlIGZyb20gZW11bGF0b3I=",
5+
"messageId": "1",
6+
"attributes": {
7+
"attr1": "attr-value1"
8+
}
9+
}
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"message": {
3+
"attributes": { "attr1":"value1" },
4+
"data": "VGV4dCBtZXNzYWdl",
5+
"messageId":"4102184774039362",
6+
"message_id":"4102184774039362",
7+
"orderingKey":"orderxyz",
8+
"publishTime":"2022-02-15T11:28:32.942Z",
9+
"publish_time":"2022-02-15T11:28:32.942Z"
10+
},
11+
"subscription":"projects/sample-project/subscriptions/sample-subscription"
12+
}

src/Google.Cloud.Functions.Framework.Tests/Google.Cloud.Functions.Framework.Tests.csproj

-4
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,4 @@
1717
<EmbeddedResource Include="**/*.json" Exclude="bin/**;obj/**" />
1818
</ItemGroup>
1919

20-
<ItemGroup>
21-
<None Remove="GcfEvents\pubsub_text_microsecond_precision.json" />
22-
</ItemGroup>
23-
2420
</Project>

src/Google.Cloud.Functions.Framework/GcfEvents/GcfConverters.cs

+60-1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ private static class EventTypes
7575
internal const string StorageObjectChanged = StorageObjectV1 + ".changed";
7676
}
7777

78+
internal const string DefaultRawPubSubTopic = "projects/unknown-project!/topics/unknown-topic!";
7879
private const string JsonContentType = "application/json";
7980

8081
private static readonly Dictionary<string, EventAdapter> s_eventTypeMapping = new Dictionary<string, EventAdapter>
@@ -142,12 +143,13 @@ private static async Task<Request> ParseRequest(HttpRequest request)
142143
throw new ConversionException($"Error parsing GCF event: {e.Message}", e);
143144
}
144145

146+
PubSubEventAdapter.NormalizeRawRequest(parsedRequest, request.Path.Value);
145147
parsedRequest.NormalizeContext();
146148
if (parsedRequest.Data is null ||
147149
string.IsNullOrEmpty(parsedRequest.Context.Id) ||
148150
string.IsNullOrEmpty(parsedRequest.Context.Type))
149151
{
150-
throw new ConversionException("Event is malformed; does not contain a payload, or the event ID is missing.");
152+
throw new ConversionException("Event is malformed; does not contain a payload, or the event ID or type is missing.");
151153
}
152154
return parsedRequest;
153155
}
@@ -292,6 +294,63 @@ protected override void MaybeReshapeData(Request request)
292294
request.Data["publishTime"] = timestamp.UtcDateTime.ToString(formatString, CultureInfo.InvariantCulture);
293295
}
294296
request.Data = new Dictionary<string, object> { { "message", request.Data } };
297+
298+
// The subscription is not provided in the legacy format, but *is* provided in
299+
// the raw Pub/Sub push notification (including in the emulator) so we should use it if we've got it.
300+
if (request.RawPubSubSubscription is string subscription)
301+
{
302+
request.Data["subscription"] = subscription;
303+
}
304+
}
305+
306+
/// <summary>
307+
/// Normalizes a raw Pub/Sub push notification (from either the emulator
308+
/// or the real Pub/Sub service) into the existing legacy format.
309+
/// </summary>
310+
/// <param name="request">The incoming request body, parsed as a <see cref="Request"/> object</param>
311+
/// <param name="path">The HTTP request path, from which the topic name will be extracted.</param>
312+
internal static void NormalizeRawRequest(Request request, string path)
313+
{
314+
// Non-raw-Pub/Sub path: just a no-op
315+
if (request.RawPubSubMessage is null && request.RawPubSubSubscription is null)
316+
{
317+
return;
318+
}
319+
if (request.RawPubSubMessage is null || request.RawPubSubSubscription is null)
320+
{
321+
throw new ConversionException("Request is malformed; it must contain both 'message' and 'subscription' properties or neither.");
322+
}
323+
if (request.Data is object ||
324+
request.Domain is object ||
325+
request.Context is object ||
326+
request.EventId is object ||
327+
request.EventType is object ||
328+
request.Params is object ||
329+
request.Timestamp is object)
330+
{
331+
throw new ConversionException("Request is malformed; raw Pub/Sub request must contain only 'message' and 'subscription' properties.");
332+
}
333+
if (!request.RawPubSubMessage.TryGetValue("messageId", out var messageIdObj) || !(messageIdObj is JsonElement messageIdElement) ||
334+
messageIdElement.ValueKind != JsonValueKind.String)
335+
{
336+
throw new ConversionException("Request is malformed; raw Pub/Sub message must contain a 'message.messageId' string property.");
337+
}
338+
request.EventId = messageIdElement.GetString();
339+
request.EventType = "providers/cloud.pubsub/eventTypes/topic.publish";
340+
// Skip the leading / in the path
341+
path = path.Length == 0 ? "" : path.Substring(1);
342+
var topicPathMatch = PubSubResourcePattern.Match(path);
343+
request.Resource = topicPathMatch.Success ? path : DefaultRawPubSubTopic;
344+
request.Data = request.RawPubSubMessage;
345+
if (request.RawPubSubMessage.TryGetValue("publishTime", out var publishTime) &&
346+
publishTime is JsonElement publishTimeElement &&
347+
publishTimeElement.ValueKind == JsonValueKind.String &&
348+
DateTimeOffset.TryParseExact(publishTimeElement.GetString(), "yyyy-MM-dd'T'HH:mm:ss.FFFFFF'Z'", CultureInfo.InvariantCulture,
349+
DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal,
350+
out var timestamp))
351+
{
352+
request.Timestamp = timestamp;
353+
}
295354
}
296355
}
297356

src/Google.Cloud.Functions.Framework/GcfEvents/Request.cs

+12
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@ internal sealed class Request
4444
[JsonPropertyName("domain")]
4545
public string? Domain { get; set; }
4646

47+
/// <summary>
48+
/// Raw PubSub only: the subscription triggering the request.
49+
/// </summary>
50+
[JsonPropertyName("subscription")]
51+
public string? RawPubSubSubscription { get; set; }
52+
53+
/// <summary>
54+
/// Raw PubSub only: the PubSub message.
55+
/// </summary>
56+
[JsonPropertyName("message")]
57+
public Dictionary<string, object>? RawPubSubMessage { get; set; }
58+
4759
public Request()
4860
{
4961
Context = null!;

0 commit comments

Comments
 (0)