Skip to content

Commit a41e4f7

Browse files
authored
fix: implement filter on PostgresChangeHandler (#55)
* feat(): try implement filter * feat(): implement filter * feat(): implement filter handler and fix broken tests * feat(): rollback examples * test(): implement test for two change postgres handle * feat(): add docblock to some methods * feat(): implment method to remove handler from binding * feat(): put conditional to check connection from browder (blazor) * feat(): remove unused vars * feat(): add unit test to validate issue 49 * feat(): remove unnecessary handler * feat(): change rule to iterate all handler and call it properly * feat(): comment get host * feat(): make rollback dockercompose * feat(): change url form Helper to connect on github action * chorus(): insert config to supabase cli * refactor(): change the equals implementation * fix(): fix project name * feat(): verify if event received is compatible to disable timeout
1 parent 973cdc2 commit a41e4f7

15 files changed

+880
-32
lines changed

.github/workflows/build-and-test.yml

+10
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ jobs:
2222

2323
- name: Build
2424
run: dotnet build --configuration Release --no-restore
25+
26+
- uses: supabase/setup-cli@v1
27+
with:
28+
version: latest
29+
30+
- name: Start Supabsae
31+
run: supabase start
32+
33+
- name: Test
34+
run: dotnet test --no-restore
2535

2636
#- name: Add hosts entries
2737
# run: |

.gitignore

+5
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,8 @@ ASALocalRun/
406406
# Local History for Visual Studio
407407
.localhistory/
408408
/RealtimeTests/.runsettings
409+
410+
# supabase stuffs
411+
supabase/.branches
412+
supabase/.temp
413+
supabase/.env

Realtime/Binding.cs

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using Supabase.Realtime.Interfaces;
2+
using Supabase.Realtime.PostgresChanges;
3+
4+
namespace Supabase.Realtime;
5+
6+
public class Binding
7+
{
8+
public PostgresChangesOptions? Options { get; set; }
9+
10+
public IRealtimeChannel.PostgresChangesHandler? Handler { get; set; }
11+
12+
public PostgresChangesOptions.ListenType? ListenType { get; set; }
13+
14+
public int? Id { get; set; }
15+
}

Realtime/Channel/Push.cs

+7-2
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,13 @@ internal void StartTimeout()
145145
/// <param name="message"></param>
146146
private void HandleSocketMessageReceived(IRealtimeSocket sender, SocketResponse message)
147147
{
148-
if (message.Ref != Ref) return;
149-
148+
// Needs to verify if realtime server won't send the message below anymore after receive a track presence event
149+
// {"ref":"bd07efe5-ca06-4257-b080-79779c6f76c4","event":"phx_reply","payload":{"status":"ok","response":{}},"topic":"realtime:online-users"}
150+
// the message was used to stop timeout handler
151+
// All tests still work on version before 2.34.21
152+
var isPresenceDiff = message is { Event: Constants.EventType.PresenceDiff };
153+
if (!isPresenceDiff && message.Ref != Ref) return;
154+
150155
CancelTimeout();
151156
Response = message;
152157
NotifyMessageReceived(message);

Realtime/PostgresChanges/PostgresChangesOptions.cs

+36-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Newtonsoft.Json;
1+
using System;
2+
using Newtonsoft.Json;
23
using Supabase.Core.Attributes;
34
using System.Collections.Generic;
45

@@ -78,7 +79,7 @@ public enum ListenType
7879
/// </summary>
7980
[JsonProperty("event")]
8081
public string Event => Core.Helpers.GetMappedToAttr(_listenType).Mapping!;
81-
82+
8283
private readonly ListenType _listenType;
8384

8485
/// <summary>
@@ -97,4 +98,37 @@ public PostgresChangesOptions(string schema, string? table = null, ListenType ev
9798
Filter = filter;
9899
Parameters = parameters;
99100
}
101+
102+
private bool Equals(PostgresChangesOptions other)
103+
{
104+
return _listenType == other._listenType && Schema == other.Schema && Table == other.Table && Filter == other.Filter;
105+
}
106+
107+
/// <summary>
108+
/// Check if object are equals
109+
/// </summary>
110+
/// <param name="obj"></param>
111+
/// <returns></returns>
112+
public override bool Equals(object? obj)
113+
{
114+
if (obj is null) return false;
115+
if (obj.GetType() != GetType()) return false;
116+
return Equals((PostgresChangesOptions)obj);
117+
}
118+
119+
/// <summary>
120+
/// Generate hash code
121+
/// </summary>
122+
/// <returns></returns>
123+
public override int GetHashCode()
124+
{
125+
unchecked
126+
{
127+
var hashCode = (int)_listenType;
128+
hashCode = (hashCode * 397) ^ Schema.GetHashCode();
129+
hashCode = (hashCode * 397) ^ (Table != null ? Table.GetHashCode() : 0);
130+
hashCode = (hashCode * 397) ^ (Filter != null ? Filter.GetHashCode() : 0);
131+
return hashCode;
132+
}
133+
}
100134
}

Realtime/PostgresChanges/PostgresChangesResponse.cs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Newtonsoft.Json;
1+
using System.Collections.Generic;
2+
using Newtonsoft.Json;
23
using Supabase.Postgrest.Models;
34
using Supabase.Realtime.Socket;
45

@@ -73,4 +74,7 @@ public class PostgresChangesPayload<T> where T : class
7374
/// </summary>
7475
[JsonProperty("data")]
7576
public SocketResponsePayload<T>? Data { get; set; }
77+
78+
[JsonProperty("ids")]
79+
public List<int?> Ids { get; set; }
7680
}

Realtime/RealtimeChannel.cs

+123-22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Runtime.CompilerServices;
45
using System.Threading.Tasks;
56
using System.Timers;
@@ -154,14 +155,13 @@ public class RealtimeChannel : IRealtimeChannel
154155
private readonly List<MessageReceivedHandler> _messageReceivedHandlers = new();
155156
private readonly List<ErrorEventHandler> _errorEventHandlers = new();
156157

157-
private readonly Dictionary<ListenType, List<PostgresChangesHandler>> _postgresChangesHandlers =
158-
new();
159-
160158
private bool CanPush => IsJoined && Socket.IsConnected;
161159
private bool _hasJoinedOnce;
162160
private readonly Timer _rejoinTimer;
163161
private bool _isRejoining;
164162

163+
private List<Binding> _bindings = [];
164+
165165
/// <summary>
166166
/// Initializes a Channel - must call `Subscribe()` to receive events.
167167
/// </summary>
@@ -330,11 +330,7 @@ private void NotifyMessageReceived(SocketResponse message)
330330
/// <param name="postgresChangeHandler"></param>
331331
public void AddPostgresChangeHandler(ListenType listenType, PostgresChangesHandler postgresChangeHandler)
332332
{
333-
if (!_postgresChangesHandlers.ContainsKey(listenType))
334-
_postgresChangesHandlers[listenType] = new List<PostgresChangesHandler>();
335-
336-
if (!_postgresChangesHandlers[listenType].Contains(postgresChangeHandler))
337-
_postgresChangesHandlers[listenType].Add(postgresChangeHandler);
333+
BindPostgresChangesHandler(listenType, postgresChangeHandler);
338334
}
339335

340336
/// <summary>
@@ -344,16 +340,16 @@ public void AddPostgresChangeHandler(ListenType listenType, PostgresChangesHandl
344340
/// <param name="postgresChangeHandler"></param>
345341
public void RemovePostgresChangeHandler(ListenType listenType, PostgresChangesHandler postgresChangeHandler)
346342
{
347-
if (_postgresChangesHandlers.ContainsKey(listenType) &&
348-
_postgresChangesHandlers[listenType].Contains(postgresChangeHandler))
349-
_postgresChangesHandlers[listenType].Remove(postgresChangeHandler);
343+
RemovePostgresChangesFromBinding(listenType, postgresChangeHandler);
350344
}
351345

352346
/// <summary>
353347
/// Clears all postgres changes listeners.
354348
/// </summary>
355-
public void ClearPostgresChangeHandlers() =>
356-
_postgresChangesHandlers.Clear();
349+
public void ClearPostgresChangeHandlers()
350+
{
351+
_bindings.Clear();
352+
}
357353

358354
/// <summary>
359355
/// Adds an error event handler.
@@ -407,15 +403,7 @@ private void NotifyPostgresChanges(EventType eventType, PostgresChangesResponse
407403
_ => ListenType.All
408404
};
409405

410-
// Invoke the wildcard listener (but only once)
411-
if (listenType != ListenType.All &&
412-
_postgresChangesHandlers.TryGetValue(ListenType.All, out var changesHandler))
413-
foreach (var handler in changesHandler.ToArray())
414-
handler.Invoke(this, response);
415-
416-
if (_postgresChangesHandlers.TryGetValue(listenType, out var postgresChangesHandler))
417-
foreach (var handler in postgresChangesHandler.ToArray())
418-
handler.Invoke(this, response);
406+
InvokeProperlyHandlerFromBind(listenType, response);
419407
}
420408

421409
/// <summary>
@@ -428,6 +416,8 @@ private void NotifyPostgresChanges(EventType eventType, PostgresChangesResponse
428416
public IRealtimeChannel Register(PostgresChangesOptions postgresChangesOptions)
429417
{
430418
PostgresChangesOptions.Add(postgresChangesOptions);
419+
420+
BindPostgresChangesOptions(postgresChangesOptions);
431421
return this;
432422
}
433423

@@ -673,6 +663,8 @@ private void HandleJoinResponse(IRealtimePush<RealtimeChannel, SocketResponse> s
673663
Options.SerializerSettings);
674664
if (obj?.Payload == null) return;
675665

666+
obj.Payload.Response?.change?.ForEach(BindIdPostgresChanges);
667+
676668
switch (obj.Payload.Status)
677669
{
678670
// A response was received from the channel
@@ -764,4 +756,113 @@ internal void HandleSocketMessage(SocketResponse message)
764756
break;
765757
}
766758
}
759+
760+
/// <summary>
761+
/// Create a Binding and add to a list
762+
/// </summary>
763+
/// <param name="options"></param>
764+
private void BindPostgresChangesOptions(PostgresChangesOptions options)
765+
{
766+
var founded = _bindings.FirstOrDefault(b => options.Equals(b.Options));
767+
if (founded != null) return;
768+
769+
_bindings.Add(
770+
new Binding
771+
{
772+
Options = options,
773+
}
774+
);
775+
}
776+
777+
/// <summary>
778+
/// Try to bind a PostgresChangesHandler to a PostgresChangesOptions
779+
/// </summary>
780+
/// <param name="listenType"></param>
781+
/// <param name="handler"></param>
782+
private void BindPostgresChangesHandler(ListenType listenType, PostgresChangesHandler handler)
783+
{
784+
var founded = _bindings.FirstOrDefault(b =>
785+
b.Options?.Event == Core.Helpers.GetMappedToAttr(listenType).Mapping &&
786+
b.Handler == null
787+
);
788+
if (founded != null)
789+
{
790+
founded.Handler = handler;
791+
founded.ListenType = listenType;
792+
return;
793+
}
794+
795+
BindPostgresChangesHandlerGeneric(listenType, handler);
796+
797+
}
798+
799+
private void BindPostgresChangesHandlerGeneric(ListenType listenType, PostgresChangesHandler handler)
800+
{
801+
var founded = _bindings.FirstOrDefault(b =>
802+
(b.Options?.Event == Core.Helpers.GetMappedToAttr(listenType).Mapping || b.Options?.Event == "*") &&
803+
b.Handler == null
804+
);
805+
if (founded == null) return;
806+
807+
founded.Handler = handler;
808+
founded.ListenType = listenType;
809+
}
810+
811+
/// <summary>
812+
/// Filter the binding list and try to add an id from socket to its binding
813+
/// </summary>
814+
/// <param name="joinResponse"></param>
815+
private void BindIdPostgresChanges(PhoenixPostgresChangeResponse joinResponse)
816+
{
817+
var founded = _bindings.FirstOrDefault(b => b.Options != null &&
818+
b.Options.Event == joinResponse.eventName &&
819+
b.Options.Table == joinResponse.table &&
820+
b.Options.Schema == joinResponse.schema &&
821+
b.Options.Filter == joinResponse.filter);
822+
if (founded == null) return;
823+
founded.Id = joinResponse?.id;
824+
}
825+
826+
/// <summary>
827+
/// Try to invoke the handler properly based on event type and socket response
828+
/// </summary>
829+
/// <param name="eventType"></param>
830+
/// <param name="response"></param>
831+
private void InvokeProperlyHandlerFromBind(ListenType eventType, PostgresChangesResponse response)
832+
{
833+
var all = _bindings.FirstOrDefault(b =>
834+
{
835+
if (b.Options == null && response.Payload == null && b.Handler == null) return false;
836+
837+
return response.Payload != null && response.Payload.Ids.Contains(b.Id) && eventType != ListenType.All &&
838+
b.ListenType == ListenType.All;
839+
});
840+
841+
if (all != null)
842+
{
843+
all.Handler?.Invoke(this, response);
844+
return;
845+
}
846+
847+
// Invoke all specific handler if possible
848+
_bindings.ForEach(binding =>
849+
{
850+
if (binding.ListenType != eventType) return;
851+
if (binding.Options == null || response.Payload == null || binding.Handler == null) return;
852+
853+
if (response.Payload.Ids.Contains(binding.Id)) binding.Handler.Invoke(this, response);
854+
});
855+
}
856+
857+
/// <summary>
858+
/// Remove handler from binding
859+
/// </summary>
860+
/// <param name="eventType"></param>
861+
/// <param name="handler"></param>
862+
private void RemovePostgresChangesFromBinding(ListenType eventType, PostgresChangesHandler handler)
863+
{
864+
var binding = _bindings.FirstOrDefault(b => b.Handler == handler && b.ListenType == eventType);
865+
if (binding == null) return;
866+
_bindings.Remove(binding);
867+
}
767868
}

Realtime/RealtimeSocket.cs

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Linq;
44
using System.Net.WebSockets;
5+
using System.Runtime.InteropServices;
56
using System.Threading;
67
using System.Threading.Tasks;
78
using Newtonsoft.Json;
@@ -88,6 +89,7 @@ public RealtimeSocket(string endpoint, ClientOptions options)
8889
_connection = new WebsocketClient(new Uri(EndpointUrl), () =>
8990
{
9091
var socket = new ClientWebSocket();
92+
if (RuntimeInformation.IsOSPlatform(OSPlatform.Create("BROWSER"))) return socket;
9193

9294
foreach (var header in Headers)
9395
socket.Options.SetRequestHeader(header.Key, header.Value);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using Newtonsoft.Json;
2+
3+
namespace Supabase.Realtime.Socket.Responses;
4+
5+
public class PhoenixPostgresChangeResponse
6+
{
7+
[JsonProperty("id")]
8+
public int? id { get; set; }
9+
10+
[JsonProperty("event")]
11+
public string? eventName { get; set; }
12+
13+
[JsonProperty("filter")]
14+
public string? filter { get; set; }
15+
16+
[JsonProperty("schema")]
17+
public string? schema { get; set; }
18+
19+
[JsonProperty("table")]
20+
public string? table { get; set; }
21+
}

Realtime/Socket/Responses/PhoenixResponse.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class PhoenixResponse
1111
/// The response.
1212
/// </summary>
1313
[JsonProperty("response")]
14-
public object? Response;
14+
public PostgresChangeResponse? Response;
1515

1616
/// <summary>
1717
/// The status.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System.Collections.Generic;
2+
using Newtonsoft.Json;
3+
4+
namespace Supabase.Realtime.Socket.Responses;
5+
6+
public class PostgresChangeResponse
7+
{
8+
[JsonProperty("postgres_changes")]
9+
public List<PhoenixPostgresChangeResponse> change { get; set; }
10+
}

0 commit comments

Comments
 (0)