Skip to content

Commit d2410c7

Browse files
warbertoddbaert
andauthored
feat!: implement grpc reconnect for inprocess mode (#1150)
Signed-off-by: Bernd Warmuth <[email protected]> Co-authored-by: Todd Baert <[email protected]>
1 parent 5a293bb commit d2410c7

23 files changed

+173
-1218
lines changed

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java

+33-38
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import io.grpc.ConnectivityState;
55
import io.grpc.ManagedChannel;
66
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.ScheduledFuture;
79
import java.util.concurrent.TimeUnit;
810
import lombok.extern.slf4j.Slf4j;
911

@@ -32,65 +34,58 @@ public static void monitorChannelState(
3234
ConnectivityState currentState = channel.getState(true);
3335
log.info("Channel state changed to: {}", currentState);
3436
if (currentState == ConnectivityState.READY) {
35-
onConnectionReady.run();
37+
if (onConnectionReady != null) {
38+
onConnectionReady.run();
39+
} else {
40+
log.debug("onConnectionReady is null");
41+
}
3642
} else if (currentState == ConnectivityState.TRANSIENT_FAILURE
3743
|| currentState == ConnectivityState.SHUTDOWN) {
38-
onConnectionLost.run();
44+
if (onConnectionLost != null) {
45+
onConnectionLost.run();
46+
} else {
47+
log.debug("onConnectionLost is null");
48+
}
3949
}
4050
// Re-register the state monitor to watch for the next state transition.
4151
monitorChannelState(currentState, channel, onConnectionReady, onConnectionLost);
4252
});
4353
}
4454

4555
/**
46-
* Waits for the channel to reach a desired state within a specified timeout period.
56+
* Waits for the channel to reach the desired connectivity state within the specified timeout.
4757
*
48-
* @param channel the ManagedChannel to monitor.
49-
* @param desiredState the ConnectivityState to wait for.
50-
* @param connectCallback callback invoked when the desired state is reached.
51-
* @param timeout the maximum amount of time to wait.
52-
* @param unit the time unit of the timeout.
53-
* @throws InterruptedException if the current thread is interrupted while waiting.
58+
* @param desiredState the desired {@link ConnectivityState} to wait for
59+
* @param channel the {@link ManagedChannel} to monitor
60+
* @param connectCallback the {@link Runnable} to execute when the desired state is reached
61+
* @param timeout the maximum time to wait
62+
* @param unit the time unit of the timeout argument
63+
* @throws InterruptedException if the current thread is interrupted while waiting
64+
* @throws GeneralError if the desired state is not reached within the timeout
5465
*/
5566
public static void waitForDesiredState(
56-
ManagedChannel channel,
5767
ConnectivityState desiredState,
58-
Runnable connectCallback,
59-
long timeout,
60-
TimeUnit unit)
61-
throws InterruptedException {
62-
waitForDesiredState(channel, desiredState, connectCallback, new CountDownLatch(1), timeout, unit);
63-
}
64-
65-
private static void waitForDesiredState(
6668
ManagedChannel channel,
67-
ConnectivityState desiredState,
6869
Runnable connectCallback,
69-
CountDownLatch latch,
7070
long timeout,
7171
TimeUnit unit)
7272
throws InterruptedException {
73-
channel.notifyWhenStateChanged(ConnectivityState.SHUTDOWN, () -> {
74-
try {
75-
ConnectivityState state = channel.getState(true);
76-
log.debug("Channel state changed to: {}", state);
73+
CountDownLatch latch = new CountDownLatch(1);
7774

78-
if (state == desiredState) {
79-
connectCallback.run();
80-
latch.countDown();
81-
return;
82-
}
83-
waitForDesiredState(channel, desiredState, connectCallback, latch, timeout, unit);
84-
} catch (InterruptedException e) {
85-
Thread.currentThread().interrupt();
86-
log.error("Thread interrupted while waiting for desired state", e);
87-
} catch (Exception e) {
88-
log.error("Error occurred while waiting for desired state", e);
75+
Runnable waitForStateTask = () -> {
76+
ConnectivityState currentState = channel.getState(true);
77+
if (currentState == desiredState) {
78+
connectCallback.run();
79+
latch.countDown();
8980
}
90-
});
81+
};
82+
83+
ScheduledFuture<?> scheduledFuture = Executors.newSingleThreadScheduledExecutor()
84+
.scheduleWithFixedDelay(waitForStateTask, 0, 100, TimeUnit.MILLISECONDS);
9185

92-
// Await the latch or timeout for the state change
93-
if (!latch.await(timeout, unit)) {
86+
boolean success = latch.await(timeout, unit);
87+
scheduledFuture.cancel(true);
88+
if (!success) {
9489
throw new GeneralError(String.format(
9590
"Deadline exceeded. Condition did not complete within the %d " + "deadline", timeout));
9691
}

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java renamed to providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/GrpcConnector.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1-
package dev.openfeature.contrib.providers.flagd.resolver.grpc;
1+
package dev.openfeature.contrib.providers.flagd.resolver.common;
22

3-
import com.google.common.annotations.VisibleForTesting;
43
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
5-
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
6-
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelMonitor;
7-
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
8-
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionState;
94
import dev.openfeature.sdk.ImmutableStructure;
105
import io.grpc.ConnectivityState;
116
import io.grpc.ManagedChannel;
@@ -125,8 +120,7 @@ public GrpcConnector(
125120
* @param onConnectionEvent a consumer to handle connection events
126121
* @param eventStreamObserver a consumer to handle the event stream
127122
*/
128-
@VisibleForTesting
129-
GrpcConnector(
123+
public GrpcConnector(
130124
final FlagdOptions options,
131125
final Function<ManagedChannel, T> stub,
132126
final Function<ManagedChannel, K> blockingStub,
@@ -143,7 +137,7 @@ public GrpcConnector(
143137
public void initialize() throws Exception {
144138
log.info("Initializing GRPC connection...");
145139
ChannelMonitor.waitForDesiredState(
146-
channel, ConnectivityState.READY, this::onInitialConnect, deadline, TimeUnit.MILLISECONDS);
140+
ConnectivityState.READY, channel, this::onInitialConnect, deadline, TimeUnit.MILLISECONDS);
147141
ChannelMonitor.monitorChannelState(ConnectivityState.READY, channel, this::onReady, this::onConnectionLost);
148142
}
149143

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffService.java

-86
This file was deleted.

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffStrategies.java

-27
This file was deleted.

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffStrategy.java

-26
This file was deleted.

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/CombinedBackoff.java

-69
This file was deleted.

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ConstantTimeBackoff.java

-28
This file was deleted.

0 commit comments

Comments
 (0)