1
1
package dev .openfeature .contrib .providers .flagd ;
2
2
3
3
import dev .openfeature .contrib .providers .flagd .resolver .Resolver ;
4
- import dev .openfeature .contrib .providers .flagd .resolver .common .ConnectionEvent ;
4
+ import dev .openfeature .contrib .providers .flagd .resolver .common .FlagdProviderEvent ;
5
+ import dev .openfeature .contrib .providers .flagd .resolver .common .Util ;
5
6
import dev .openfeature .contrib .providers .flagd .resolver .grpc .GrpcResolver ;
6
7
import dev .openfeature .contrib .providers .flagd .resolver .grpc .cache .Cache ;
7
8
import dev .openfeature .contrib .providers .flagd .resolver .process .InProcessResolver ;
12
13
import dev .openfeature .sdk .ImmutableStructure ;
13
14
import dev .openfeature .sdk .Metadata ;
14
15
import dev .openfeature .sdk .ProviderEvaluation ;
16
+ import dev .openfeature .sdk .ProviderEvent ;
15
17
import dev .openfeature .sdk .ProviderEventDetails ;
16
18
import dev .openfeature .sdk .Structure ;
17
19
import dev .openfeature .sdk .Value ;
18
20
import java .util .ArrayList ;
19
21
import java .util .Collections ;
20
22
import java .util .List ;
23
+ import java .util .concurrent .Executors ;
24
+ import java .util .concurrent .ScheduledExecutorService ;
25
+ import java .util .concurrent .ScheduledFuture ;
26
+ import java .util .concurrent .TimeUnit ;
21
27
import java .util .function .Function ;
22
28
import lombok .extern .slf4j .Slf4j ;
23
29
@@ -31,10 +37,29 @@ public class FlagdProvider extends EventProvider {
31
37
private static final String FLAGD_PROVIDER = "flagd" ;
32
38
private final Resolver flagResolver ;
33
39
private volatile boolean initialized = false ;
34
- private volatile boolean connected = false ;
35
40
private volatile Structure syncMetadata = new ImmutableStructure ();
36
41
private volatile EvaluationContext enrichedContext = new ImmutableContext ();
37
42
private final List <Hook > hooks = new ArrayList <>();
43
+ private volatile ProviderEvent previousEvent = null ;
44
+
45
+ /**
46
+ * An executor service responsible for scheduling reconnection attempts.
47
+ */
48
+ private final ScheduledExecutorService reconnectExecutor ;
49
+
50
+ /**
51
+ * A scheduled task for managing reconnection attempts.
52
+ */
53
+ private ScheduledFuture <?> reconnectTask ;
54
+
55
+ /**
56
+ * The grace period in milliseconds to wait for reconnection before emitting an error event.
57
+ */
58
+ private final long gracePeriod ;
59
+ /**
60
+ * The deadline in milliseconds for GRPC operations.
61
+ */
62
+ private final long deadline ;
38
63
39
64
protected final void finalize () {
40
65
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
@@ -55,18 +80,21 @@ public FlagdProvider() {
55
80
public FlagdProvider (final FlagdOptions options ) {
56
81
switch (options .getResolverType ().asString ()) {
57
82
case Config .RESOLVER_IN_PROCESS :
58
- this .flagResolver = new InProcessResolver (options , this ::isConnected , this :: onConnectionEvent );
83
+ this .flagResolver = new InProcessResolver (options , this ::onProviderEvent );
59
84
break ;
60
85
case Config .RESOLVER_RPC :
61
86
this .flagResolver = new GrpcResolver (
62
- options , new Cache (options .getCacheType (), options .getMaxCacheSize ()), this ::onConnectionEvent );
87
+ options , new Cache (options .getCacheType (), options .getMaxCacheSize ()), this ::onProviderEvent );
63
88
break ;
64
89
default :
65
90
throw new IllegalStateException (
66
91
String .format ("Requested unsupported resolver type of %s" , options .getResolverType ()));
67
92
}
68
93
hooks .add (new SyncMetadataHook (this ::getEnrichedContext ));
69
94
contextEnricher = options .getContextEnricher ();
95
+ this .reconnectExecutor = Executors .newSingleThreadScheduledExecutor ();
96
+ this .gracePeriod = options .getRetryGracePeriod ();
97
+ this .deadline = options .getDeadline ();
70
98
}
71
99
72
100
@ Override
@@ -81,17 +109,22 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws
81
109
}
82
110
83
111
this .flagResolver .init ();
84
- this .initialized = this .connected = true ;
112
+ // block till ready - this works with deadline fine for rpc, but with in_process we also need to take parsing into the equation
113
+ Util .busyWaitAndCheck (this .deadline + 1000 , () -> initialized );
114
+
85
115
}
86
116
87
117
@ Override
88
118
public synchronized void shutdown () {
89
119
if (!this .initialized ) {
90
120
return ;
91
121
}
92
-
93
122
try {
94
123
this .flagResolver .shutdown ();
124
+ if (reconnectExecutor != null ) {
125
+ reconnectExecutor .shutdownNow ();
126
+ reconnectExecutor .awaitTermination (deadline , TimeUnit .MILLISECONDS );
127
+ }
95
128
} catch (Exception e ) {
96
129
log .error ("Error during shutdown {}" , FLAGD_PROVIDER , e );
97
130
} finally {
@@ -151,47 +184,77 @@ EvaluationContext getEnrichedContext() {
151
184
return enrichedContext ;
152
185
}
153
186
154
- private boolean isConnected () {
155
- return this .connected ;
156
- }
187
+ private void onProviderEvent (FlagdProviderEvent flagdProviderEvent ) {
157
188
158
- private void onConnectionEvent (ConnectionEvent connectionEvent ) {
159
- final boolean wasConnected = connected ;
160
- final boolean isConnected = connected = connectionEvent .isConnected ();
189
+ syncMetadata = flagdProviderEvent .getSyncMetadata ();
190
+ if (flagdProviderEvent .getSyncMetadata () != null ) {
191
+ enrichedContext = contextEnricher .apply (flagdProviderEvent .getSyncMetadata ());
192
+ }
161
193
162
- syncMetadata = connectionEvent .getSyncMetadata ();
163
- enrichedContext = contextEnricher .apply (connectionEvent .getSyncMetadata ());
194
+ switch (flagdProviderEvent .getEvent ()) {
195
+ case PROVIDER_CONFIGURATION_CHANGED :
196
+ if (previousEvent == ProviderEvent .PROVIDER_READY ) {
197
+ this .emitProviderConfigurationChanged (
198
+ ProviderEventDetails
199
+ .builder ()
200
+ .flagsChanged (flagdProviderEvent .getFlagsChanged ())
201
+ .message ("configuration changed" )
202
+ .build ());
203
+ break ;
204
+ }
205
+ case PROVIDER_READY :
206
+ onReady ();
207
+ previousEvent = ProviderEvent .PROVIDER_READY ;
208
+ break ;
164
209
165
- if (!initialized ) {
166
- return ;
210
+ case PROVIDER_ERROR :
211
+ if (previousEvent != ProviderEvent .PROVIDER_ERROR ) {
212
+ onError ();
213
+ }
214
+ previousEvent = ProviderEvent .PROVIDER_ERROR ;
215
+ break ;
167
216
}
217
+ }
168
218
169
- if (!wasConnected && isConnected ) {
170
- ProviderEventDetails details = ProviderEventDetails .builder ()
171
- .flagsChanged (connectionEvent .getFlagsChanged ())
172
- .message ("connected to flagd" )
173
- .build ();
174
- this .emitProviderReady (details );
175
- return ;
219
+ private void onReady () {
220
+ if (!initialized ) {
221
+ initialized = true ;
222
+ log .info ("initialized FlagdProvider" );
223
+ }
224
+ if (reconnectTask != null && !reconnectTask .isCancelled ()) {
225
+ reconnectTask .cancel (false );
226
+ log .debug ("Reconnection task cancelled as connection became READY." );
176
227
}
228
+ this .emitProviderReady (
229
+ ProviderEventDetails
230
+ .builder ()
231
+ .message ("connected to flagd" )
232
+ .build ());
233
+ }
177
234
178
- if ( wasConnected && isConnected ) {
179
- ProviderEventDetails details = ProviderEventDetails . builder ()
180
- . flagsChanged ( connectionEvent . getFlagsChanged ())
181
- . message ("configuration changed" )
182
- . build ();
183
- this . emitProviderConfigurationChanged ( details );
184
- return ;
235
+ private void onError ( ) {
236
+ log . info ( "Connection lost. Emit STALE event..." );
237
+ log . debug ( "Waiting {}s for connection to become available..." , gracePeriod );
238
+ this . emitProviderStale ( ProviderEventDetails . builder (). message ("there has been an error" ). build ());
239
+
240
+ if ( reconnectTask != null && ! reconnectTask . isCancelled ()) {
241
+ reconnectTask . cancel ( false ) ;
185
242
}
186
243
187
- if (connectionEvent .isStale ()) {
188
- this .emitProviderStale (ProviderEventDetails .builder ()
189
- .message ("there has been an error" )
190
- .build ());
191
- } else {
192
- this .emitProviderError (ProviderEventDetails .builder ()
193
- .message ("there has been an error" )
194
- .build ());
244
+ if (!reconnectExecutor .isShutdown ()) {
245
+ reconnectTask = reconnectExecutor .schedule (
246
+ () -> {
247
+ log .debug (
248
+ "Provider did not reconnect successfully within {}s. Emit ERROR event..." , gracePeriod );
249
+ flagResolver .onError ();
250
+ this .emitProviderError (ProviderEventDetails .builder ()
251
+ .message ("there has been an error" )
252
+ .build ());
253
+ ;
254
+ },
255
+ gracePeriod ,
256
+ TimeUnit .SECONDS );
195
257
}
258
+
196
259
}
197
260
}
0 commit comments