Skip to content

Commit 4f484b7

Browse files
chrfwowtoddbaert
andauthored
feat: Improve wait logic to a more elegant solution #1160 (#1169)
Signed-off-by: christian.lutnik <[email protected]> Co-authored-by: Todd Baert <[email protected]>
1 parent 8acfe61 commit 4f484b7

File tree

9 files changed

+277
-95
lines changed

9 files changed

+277
-95
lines changed

Diff for: pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@
7474
<!-- this can be overriden in child POMs to support specific SDK requirements -->
7575
<groupId>dev.openfeature</groupId>
7676
<artifactId>sdk</artifactId>
77-
<!-- 1.14 <= v < 2.0 (excluding 2.0 pre-releases)-->
78-
<version>[1.14,1.99999)</version>
77+
<!-- 1.14.1 <= v < 2.0 (excluding 2.0 pre-releases)-->
78+
<version>[1.14.1,1.99999)</version>
7979
<!-- use the version provided at runtime -->
8080
<scope>provided</scope>
8181
</dependency>

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

+28-41
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@
22

33
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
44
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
5-
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
65
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
76
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
87
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
98
import dev.openfeature.sdk.EvaluationContext;
109
import dev.openfeature.sdk.EventProvider;
1110
import dev.openfeature.sdk.Hook;
12-
import dev.openfeature.sdk.ImmutableContext;
1311
import dev.openfeature.sdk.Metadata;
1412
import dev.openfeature.sdk.ProviderEvaluation;
1513
import dev.openfeature.sdk.ProviderEvent;
@@ -36,7 +34,7 @@ public class FlagdProvider extends EventProvider {
3634
private static final String FLAGD_PROVIDER = "flagd";
3735
private final Resolver flagResolver;
3836
private final List<Hook> hooks = new ArrayList<>();
39-
private final EventsLock eventsLock = new EventsLock();
37+
private final FlagdProviderSyncResources syncResources = new FlagdProviderSyncResources();
4038

4139
/**
4240
* An executor service responsible for emitting
@@ -108,7 +106,9 @@ public FlagdProvider(final FlagdOptions options) {
108106
gracePeriod = Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD;
109107
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
110108
errorExecutor = Executors.newSingleThreadScheduledExecutor();
111-
this.eventsLock.initialized = initialized;
109+
if (initialized) {
110+
this.syncResources.initialize();
111+
}
112112
}
113113

114114
@Override
@@ -118,28 +118,27 @@ public List<Hook> getProviderHooks() {
118118

119119
@Override
120120
public void initialize(EvaluationContext evaluationContext) throws Exception {
121-
synchronized (eventsLock) {
122-
if (eventsLock.initialized) {
121+
synchronized (syncResources) {
122+
if (syncResources.isInitialized()) {
123123
return;
124124
}
125125

126126
flagResolver.init();
127+
// block till ready - this works with deadline fine for rpc, but with in_process
128+
// we also need to take parsing into the equation
129+
// TODO: evaluate where we are losing time, so we can remove this magic number -
130+
syncResources.waitForInitialization(this.deadline * 2);
127131
}
128-
// block till ready - this works with deadline fine for rpc, but with in_process
129-
// we also need to take parsing into the equation
130-
// TODO: evaluate where we are losing time, so we can remove this magic number -
131-
// follow up
132-
// wait outside of the synchonrization or we'll deadlock
133-
Util.busyWaitAndCheck(this.deadline * 2, () -> eventsLock.initialized);
134132
}
135133

136134
@Override
137135
public void shutdown() {
138-
synchronized (eventsLock) {
139-
if (!eventsLock.initialized) {
140-
return;
141-
}
136+
synchronized (syncResources) {
142137
try {
138+
if (!syncResources.isInitialized() || syncResources.isShutDown()) {
139+
return;
140+
}
141+
143142
this.flagResolver.shutdown();
144143
if (errorExecutor != null) {
145144
errorExecutor.shutdownNow();
@@ -148,7 +147,7 @@ public void shutdown() {
148147
} catch (Exception e) {
149148
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
150149
} finally {
151-
eventsLock.initialized = false;
150+
syncResources.shutdown();
152151
}
153152
}
154153
}
@@ -189,15 +188,13 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
189188
* @return context
190189
*/
191190
EvaluationContext getEnrichedContext() {
192-
return eventsLock.enrichedContext;
191+
return syncResources.getEnrichedContext();
193192
}
194193

195194
@SuppressWarnings("checkstyle:fallthrough")
196195
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
197-
198-
synchronized (eventsLock) {
199-
log.info("FlagdProviderEvent: {}", flagdProviderEvent.getEvent());
200-
196+
log.info("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent());
197+
synchronized (syncResources) {
201198
/*
202199
* We only use Error and Ready as previous states.
203200
* As error will first be emitted as Stale, and only turns after a while into an
@@ -209,29 +206,30 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
209206
*/
210207
switch (flagdProviderEvent.getEvent()) {
211208
case PROVIDER_CONFIGURATION_CHANGED:
212-
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_READY) {
209+
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_READY) {
213210
onConfigurationChanged(flagdProviderEvent);
214211
break;
215212
}
216-
// intentional fall through, a not-ready change will trigger a ready.
213+
// intentional fall through
217214
case PROVIDER_READY:
218215
/*
219216
* Sync metadata is used to enrich the context, and is immutable in flagd,
220217
* so we only need it to be fetched once at READY.
221218
*/
222219
if (flagdProviderEvent.getSyncMetadata() != null) {
223-
eventsLock.enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
220+
syncResources.setEnrichedContext(contextEnricher.apply(flagdProviderEvent.getSyncMetadata()));
224221
}
225222
onReady();
226-
eventsLock.previousEvent = ProviderEvent.PROVIDER_READY;
223+
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY);
227224
break;
228225

229226
case PROVIDER_ERROR:
230-
if (eventsLock.previousEvent != ProviderEvent.PROVIDER_ERROR) {
227+
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) {
231228
onError();
229+
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR);
232230
}
233-
eventsLock.previousEvent = ProviderEvent.PROVIDER_ERROR;
234231
break;
232+
235233
default:
236234
log.info("Unknown event {}", flagdProviderEvent.getEvent());
237235
}
@@ -246,8 +244,7 @@ private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
246244
}
247245

248246
private void onReady() {
249-
if (!eventsLock.initialized) {
250-
eventsLock.initialized = true;
247+
if (syncResources.initialize()) {
251248
log.info("initialized FlagdProvider");
252249
}
253250
if (errorTask != null && !errorTask.isCancelled()) {
@@ -272,7 +269,7 @@ private void onError() {
272269
if (!errorExecutor.isShutdown()) {
273270
errorTask = errorExecutor.schedule(
274271
() -> {
275-
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_ERROR) {
272+
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) {
276273
log.debug(
277274
"Provider did not reconnect successfully within {}s. Emit ERROR event...",
278275
gracePeriod);
@@ -286,14 +283,4 @@ private void onError() {
286283
TimeUnit.SECONDS);
287284
}
288285
}
289-
290-
/**
291-
* Contains all fields we need to worry about locking, used as intrinsic lock
292-
* for sync blocks.
293-
*/
294-
static class EventsLock {
295-
volatile ProviderEvent previousEvent = null;
296-
volatile boolean initialized = false;
297-
volatile EvaluationContext enrichedContext = new ImmutableContext();
298-
}
299286
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package dev.openfeature.contrib.providers.flagd;
2+
3+
import dev.openfeature.sdk.EvaluationContext;
4+
import dev.openfeature.sdk.ImmutableContext;
5+
import dev.openfeature.sdk.ProviderEvent;
6+
import dev.openfeature.sdk.exceptions.GeneralError;
7+
import lombok.Getter;
8+
import lombok.Setter;
9+
10+
/**
11+
* Contains all fields we need to worry about locking, used as intrinsic lock
12+
* for sync blocks in the {@link FlagdProvider}.
13+
*/
14+
@Getter
15+
class FlagdProviderSyncResources {
16+
@Setter
17+
private volatile ProviderEvent previousEvent = null;
18+
19+
private volatile EvaluationContext enrichedContext = new ImmutableContext();
20+
private volatile boolean initialized;
21+
private volatile boolean isShutDown;
22+
23+
public void setEnrichedContext(EvaluationContext context) {
24+
this.enrichedContext = new ImmutableContext(context.asMap());
25+
}
26+
27+
/**
28+
* With this method called, it is suggested that initialization has been completed. It will wake up all threads that
29+
* wait for the initialization. Subsequent calls have no effect.
30+
*
31+
* @return true iff this was the first call to {@code initialize()}
32+
*/
33+
public synchronized boolean initialize() {
34+
if (this.initialized) {
35+
return false;
36+
}
37+
this.initialized = true;
38+
this.notifyAll();
39+
return true;
40+
}
41+
42+
/**
43+
* Blocks the calling thread until either {@link FlagdProviderSyncResources#initialize()} or
44+
* {@link FlagdProviderSyncResources#shutdown()} is called or the deadline is exceeded, whatever happens first. If
45+
* {@link FlagdProviderSyncResources#initialize()} has been executed before {@code waitForInitialization(long)} is
46+
* called, it will return instantly. If the deadline is exceeded, a GeneralError will be thrown.
47+
* If {@link FlagdProviderSyncResources#shutdown()} is called in the meantime, an {@link IllegalStateException} will
48+
* be thrown. Otherwise, the method will return cleanly.
49+
*
50+
* @param deadline the maximum time in ms to wait
51+
* @throws GeneralError when the deadline is exceeded before
52+
* {@link FlagdProviderSyncResources#initialize()} is called on this object
53+
* @throws IllegalStateException when {@link FlagdProviderSyncResources#shutdown()} is called or has been called on
54+
* this object
55+
*/
56+
public void waitForInitialization(long deadline) {
57+
long start = System.currentTimeMillis();
58+
long end = start + deadline;
59+
while (!initialized && !isShutDown) {
60+
long now = System.currentTimeMillis();
61+
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
62+
if (now >= end) {
63+
throw new GeneralError(String.format(
64+
"Deadline exceeded. Condition did not complete within the %d ms deadline", deadline));
65+
}
66+
long remaining = end - now;
67+
synchronized (this) {
68+
if (isShutDown) {
69+
break;
70+
}
71+
if (initialized) { // might have changed in the meantime
72+
return;
73+
}
74+
try {
75+
this.wait(remaining);
76+
} catch (InterruptedException e) {
77+
// try again. Leave the continue to make PMD happy
78+
continue;
79+
}
80+
}
81+
}
82+
if (isShutDown) {
83+
throw new IllegalStateException("Already shut down");
84+
}
85+
}
86+
87+
/**
88+
* Signals a shutdown. Threads waiting for initialization will wake up and throw an {@link IllegalStateException}.
89+
*/
90+
public synchronized void shutdown() {
91+
isShutDown = true;
92+
this.notifyAll();
93+
}
94+
}

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/Util.java

-39
This file was deleted.

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public class InProcessResolver implements Resolver {
3838
private final Storage flagStore;
3939
private final Consumer<FlagdProviderEvent> onConnectionEvent;
4040
private final Operator operator;
41-
private final long deadline;
4241
private final String scope;
4342

4443
/**
@@ -52,7 +51,6 @@ public class InProcessResolver implements Resolver {
5251
*/
5352
public InProcessResolver(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
5453
this.flagStore = new FlagStore(getConnector(options, onConnectionEvent));
55-
this.deadline = options.getDeadline();
5654
this.onConnectionEvent = onConnectionEvent;
5755
this.operator = new Operator();
5856
this.scope = options.getSelector();
@@ -70,10 +68,12 @@ public void init() throws Exception {
7068
flagStore.getStateQueue().take();
7169
switch (storageStateChange.getStorageState()) {
7270
case OK:
71+
log.info("onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
7372
onConnectionEvent.accept(new FlagdProviderEvent(
7473
ProviderEvent.PROVIDER_CONFIGURATION_CHANGED,
7574
storageStateChange.getChangedFlagsKeys(),
7675
storageStateChange.getSyncMetadata()));
76+
log.info("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
7777
break;
7878
case ERROR:
7979
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));

0 commit comments

Comments
 (0)