10
10
11
11
namespace OpenFeature
12
12
{
13
-
14
- internal delegate Task ShutdownDelegate ( ) ;
15
-
16
- internal class EventExecutor
13
+ internal class EventExecutor : IAsyncDisposable
17
14
{
18
15
private readonly object _lockObj = new object ( ) ;
19
16
public readonly Channel < object > EventChannel = Channel . CreateBounded < object > ( 1 ) ;
20
- private FeatureProviderReference _defaultProvider ;
21
- private readonly Dictionary < string , FeatureProviderReference > _namedProviderReferences = new Dictionary < string , FeatureProviderReference > ( ) ;
22
- private readonly List < FeatureProviderReference > _activeSubscriptions = new List < FeatureProviderReference > ( ) ;
23
- private readonly SemaphoreSlim _shutdownSemaphore = new SemaphoreSlim ( 0 ) ;
24
-
25
- private ShutdownDelegate _shutdownDelegate ;
17
+ private FeatureProvider _defaultProvider ;
18
+ private readonly Dictionary < string , FeatureProvider > _namedProviderReferences = new Dictionary < string , FeatureProvider > ( ) ;
19
+ private readonly List < FeatureProvider > _activeSubscriptions = new List < FeatureProvider > ( ) ;
26
20
27
21
private readonly Dictionary < ProviderEventTypes , List < EventHandlerDelegate > > _apiHandlers = new Dictionary < ProviderEventTypes , List < EventHandlerDelegate > > ( ) ;
28
22
private readonly Dictionary < string , Dictionary < ProviderEventTypes , List < EventHandlerDelegate > > > _clientHandlers = new Dictionary < string , Dictionary < ProviderEventTypes , List < EventHandlerDelegate > > > ( ) ;
@@ -32,11 +26,12 @@ internal class EventExecutor
32
26
public EventExecutor ( )
33
27
{
34
28
this . Logger = new Logger < EventExecutor > ( new NullLoggerFactory ( ) ) ;
35
- this . _shutdownDelegate = this . SignalShutdownAsync ;
36
29
var eventProcessing = new Thread ( this . ProcessEventAsync ) ;
37
30
eventProcessing . Start ( ) ;
38
31
}
39
32
33
+ public ValueTask DisposeAsync ( ) => new ( this . Shutdown ( ) ) ;
34
+
40
35
internal void AddApiLevelHandler ( ProviderEventTypes eventType , EventHandlerDelegate handler )
41
36
{
42
37
lock ( this . _lockObj )
@@ -114,7 +109,7 @@ internal void RegisterDefaultFeatureProvider(FeatureProvider provider)
114
109
{
115
110
var oldProvider = this . _defaultProvider ;
116
111
117
- this . _defaultProvider = new FeatureProviderReference ( provider ) ;
112
+ this . _defaultProvider = provider ;
118
113
119
114
this . StartListeningAndShutdownOld ( this . _defaultProvider , oldProvider ) ;
120
115
}
@@ -128,8 +123,8 @@ internal void RegisterClientFeatureProvider(string client, FeatureProvider provi
128
123
}
129
124
lock ( this . _lockObj )
130
125
{
131
- var newProvider = new FeatureProviderReference ( provider ) ;
132
- FeatureProviderReference oldProvider = null ;
126
+ var newProvider = provider ;
127
+ FeatureProvider oldProvider = null ;
133
128
if ( this . _namedProviderReferences . TryGetValue ( client , out var foundOldProvider ) )
134
129
{
135
130
oldProvider = foundOldProvider ;
@@ -141,7 +136,7 @@ internal void RegisterClientFeatureProvider(string client, FeatureProvider provi
141
136
}
142
137
}
143
138
144
- private void StartListeningAndShutdownOld ( FeatureProviderReference newProvider , FeatureProviderReference oldProvider )
139
+ private void StartListeningAndShutdownOld ( FeatureProvider newProvider , FeatureProvider oldProvider )
145
140
{
146
141
// check if the provider is already active - if not, we need to start listening for its emitted events
147
142
if ( ! this . IsProviderActive ( newProvider ) )
@@ -154,15 +149,11 @@ private void StartListeningAndShutdownOld(FeatureProviderReference newProvider,
154
149
if ( oldProvider != null && ! this . IsProviderBound ( oldProvider ) )
155
150
{
156
151
this . _activeSubscriptions . Remove ( oldProvider ) ;
157
- var channel = oldProvider . Provider . GetEventChannel ( ) ;
158
- if ( channel != null )
159
- {
160
- channel . Writer . WriteAsync ( new ShutdownSignal ( ) ) ;
161
- }
152
+ oldProvider . GetEventChannel ( ) ? . Writer . Complete ( ) ;
162
153
}
163
154
}
164
155
165
- private bool IsProviderBound ( FeatureProviderReference provider )
156
+ private bool IsProviderBound ( FeatureProvider provider )
166
157
{
167
158
if ( this . _defaultProvider == provider )
168
159
{
@@ -178,18 +169,18 @@ private bool IsProviderBound(FeatureProviderReference provider)
178
169
return false ;
179
170
}
180
171
181
- private bool IsProviderActive ( FeatureProviderReference providerRef )
172
+ private bool IsProviderActive ( FeatureProvider providerRef )
182
173
{
183
174
return this . _activeSubscriptions . Contains ( providerRef ) ;
184
175
}
185
176
186
- private void EmitOnRegistration ( FeatureProviderReference provider , ProviderEventTypes eventType , EventHandlerDelegate handler )
177
+ private void EmitOnRegistration ( FeatureProvider provider , ProviderEventTypes eventType , EventHandlerDelegate handler )
187
178
{
188
179
if ( provider == null )
189
180
{
190
181
return ;
191
182
}
192
- var status = provider . Provider . GetStatus ( ) ;
183
+ var status = provider . GetStatus ( ) ;
193
184
194
185
var message = "" ;
195
186
if ( status == ProviderStatus . Ready && eventType == ProviderEventTypes . ProviderReady )
@@ -211,7 +202,7 @@ private void EmitOnRegistration(FeatureProviderReference provider, ProviderEvent
211
202
{
212
203
handler . Invoke ( new ProviderEventPayload
213
204
{
214
- ProviderName = provider . Provider ? . GetMetadata ( ) ? . Name ,
205
+ ProviderName = provider . GetMetadata ( ) ? . Name ,
215
206
Type = eventType ,
216
207
Message = message
217
208
} ) ;
@@ -225,33 +216,33 @@ private void EmitOnRegistration(FeatureProviderReference provider, ProviderEvent
225
216
226
217
private async void ProcessFeatureProviderEventsAsync ( object providerRef )
227
218
{
228
- while ( true )
219
+ var typedProviderRef = ( FeatureProvider ) providerRef ;
220
+ if ( typedProviderRef . GetEventChannel ( ) is not { Reader : { } reader } )
229
221
{
230
- var typedProviderRef = ( FeatureProviderReference ) providerRef ;
231
- if ( typedProviderRef . Provider . GetEventChannel ( ) == null )
232
- {
233
- return ;
234
- }
235
- var item = await typedProviderRef . Provider . GetEventChannel ( ) . Reader . ReadAsync ( ) . ConfigureAwait ( false ) ;
222
+ return ;
223
+ }
224
+
225
+ while ( await reader . WaitToReadAsync ( ) . ConfigureAwait ( false ) )
226
+ {
227
+ if ( ! reader . TryRead ( out var item ) )
228
+ continue ;
236
229
237
230
switch ( item )
238
231
{
239
232
case ProviderEventPayload eventPayload :
240
233
await this . EventChannel . Writer . WriteAsync ( new Event { Provider = typedProviderRef , EventPayload = eventPayload } ) . ConfigureAwait ( false ) ;
241
234
break ;
242
- case ShutdownSignal _:
243
- typedProviderRef . ShutdownSemaphore . Release ( ) ;
244
- return ;
245
235
}
246
236
}
247
237
}
248
238
249
239
// Method to process events
250
240
private async void ProcessEventAsync ( )
251
241
{
252
- while ( true )
242
+ while ( await this . EventChannel . Reader . WaitToReadAsync ( ) . ConfigureAwait ( false ) )
253
243
{
254
- var item = await this . EventChannel . Reader . ReadAsync ( ) . ConfigureAwait ( false ) ;
244
+ if ( ! this . EventChannel . Reader . TryRead ( out var item ) )
245
+ continue ;
255
246
256
247
switch ( item )
257
248
{
@@ -307,9 +298,6 @@ private async void ProcessEventAsync()
307
298
}
308
299
}
309
300
break ;
310
- case ShutdownSignal _:
311
- this . _shutdownSemaphore . Release ( ) ;
312
- return ;
313
301
}
314
302
315
303
}
@@ -329,43 +317,15 @@ private void InvokeEventHandler(EventHandlerDelegate eventHandler, Event e)
329
317
330
318
public async Task Shutdown ( )
331
319
{
332
- await this . _shutdownDelegate ( ) . ConfigureAwait ( false ) ;
333
- }
320
+ this . EventChannel . Writer . Complete ( ) ;
334
321
335
- internal void SetShutdownDelegate ( ShutdownDelegate del )
336
- {
337
- this . _shutdownDelegate = del ;
338
- }
339
-
340
- // Method to signal shutdown
341
- private async Task SignalShutdownAsync ( )
342
- {
343
- // Enqueue a shutdown signal
344
- await this . EventChannel . Writer . WriteAsync ( new ShutdownSignal ( ) ) . ConfigureAwait ( false ) ;
345
-
346
- // Wait for the processing loop to acknowledge the shutdown
347
- await this . _shutdownSemaphore . WaitAsync ( ) . ConfigureAwait ( false ) ;
348
- }
349
- }
350
-
351
- internal class ShutdownSignal
352
- {
353
- }
354
-
355
- internal class FeatureProviderReference
356
- {
357
- internal readonly SemaphoreSlim ShutdownSemaphore = new SemaphoreSlim ( 0 ) ;
358
- internal FeatureProvider Provider { get ; }
359
-
360
- public FeatureProviderReference ( FeatureProvider provider )
361
- {
362
- this . Provider = provider ;
322
+ await this . EventChannel . Reader . Completion . ConfigureAwait ( false ) ;
363
323
}
364
324
}
365
325
366
326
internal class Event
367
327
{
368
- internal FeatureProviderReference Provider { get ; set ; }
328
+ internal FeatureProvider Provider { get ; set ; }
369
329
internal ProviderEventPayload EventPayload { get ; set ; }
370
330
}
371
331
}
0 commit comments