Skip to content

Commit 8e2aeae

Browse files
committed
fixup: fixing nit, and adding information for fallthrough
Signed-off-by: Simon Schrottner <[email protected]>
1 parent 853727f commit 8e2aeae

File tree

2 files changed

+17
-14
lines changed

2 files changed

+17
-14
lines changed

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

+16-13
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class FlagdProvider extends EventProvider {
5151
/**
5252
* A scheduled task for emitting {@link ProviderEvent#PROVIDER_ERROR}.
5353
*/
54-
private ScheduledFuture<?> reconnectTask;
54+
private ScheduledFuture<?> errorTask;
5555

5656
/**
5757
* The grace period in milliseconds to wait after {@link ProviderEvent#PROVIDER_STALE} before emitting a
@@ -188,7 +188,7 @@ EvaluationContext getEnrichedContext() {
188188
}
189189

190190
@SuppressWarnings("checkstyle:fallthrough")
191-
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
191+
private synchronized void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
192192

193193
syncMetadata = flagdProviderEvent.getSyncMetadata();
194194
if (flagdProviderEvent.getSyncMetadata() != null) {
@@ -236,8 +236,8 @@ private void onReady() {
236236
initialized = true;
237237
log.info("initialized FlagdProvider");
238238
}
239-
if (reconnectTask != null && !reconnectTask.isCancelled()) {
240-
reconnectTask.cancel(false);
239+
if (errorTask != null && !errorTask.isCancelled()) {
240+
errorTask.cancel(false);
241241
log.debug("Reconnection task cancelled as connection became READY.");
242242
}
243243
this.emitProviderReady(
@@ -251,19 +251,22 @@ private void onError() {
251251
.message("there has been an error")
252252
.build());
253253

254-
if (reconnectTask != null && !reconnectTask.isCancelled()) {
255-
reconnectTask.cancel(false);
254+
if (errorTask != null && !errorTask.isCancelled()) {
255+
errorTask.cancel(false);
256256
}
257257

258258
if (!errorExecutor.isShutdown()) {
259-
reconnectTask = errorExecutor.schedule(
259+
errorTask = errorExecutor.schedule(
260260
() -> {
261-
log.debug(
262-
"Provider did not reconnect successfully within {}s. Emit ERROR event...", gracePeriod);
263-
flagResolver.onError();
264-
this.emitProviderError(ProviderEventDetails.builder()
265-
.message("there has been an error")
266-
.build());
261+
if(previousEvent == ProviderEvent.PROVIDER_ERROR) {
262+
log.debug(
263+
"Provider did not reconnect successfully within {}s. Emit ERROR event...",
264+
gracePeriod);
265+
flagResolver.onError();
266+
this.emitProviderError(ProviderEventDetails.builder()
267+
.message("there has been an error")
268+
.build());
269+
}
267270
},
268271
gracePeriod,
269272
TimeUnit.SECONDS);

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import io.grpc.Context.CancellableContext;
1717
import java.util.concurrent.BlockingQueue;
1818
import java.util.concurrent.LinkedBlockingQueue;
19-
import java.util.concurrent.TimeUnit;
2019
import java.util.concurrent.atomic.AtomicBoolean;
2120
import java.util.function.Consumer;
2221
import lombok.extern.slf4j.Slf4j;
@@ -121,6 +120,7 @@ void observeEventStream(final BlockingQueue<QueuePayload> writeTo, final AtomicB
121120
metadataException = e;
122121
}
123122

123+
log.info("stream");
124124
while (!shutdown.get()) {
125125
final GrpcResponseModel response = streamReceiver.take();
126126
if (response.isComplete()) {

0 commit comments

Comments
 (0)