Skip to content

Commit 22d2a35

Browse files
toddbaertaepflichrfwow
authored
fix: broken reconnect on some HTTP2 error frames (#1261)
Signed-off-by: Todd Baert <[email protected]> Co-authored-by: Simon Schrottner <[email protected]> Co-authored-by: christian.lutnik <[email protected]>
1 parent d43fa00 commit 22d2a35

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+869
-725
lines changed

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package dev.openfeature.contrib.providers.flagd;
22

3-
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.CacheType;
3+
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType;
44
import java.util.function.Function;
55
import lombok.extern.slf4j.Slf4j;
66

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefault;
44
import static dev.openfeature.contrib.providers.flagd.Config.fromValueProvider;
55

6-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
6+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
77
import dev.openfeature.sdk.EvaluationContext;
88
import dev.openfeature.sdk.ImmutableContext;
99
import dev.openfeature.sdk.Structure;
@@ -163,7 +163,7 @@ public class FlagdOptions {
163163
/**
164164
* Inject a Custom Connector for fetching flags.
165165
*/
166-
private Connector customConnector;
166+
private QueueSource customConnector;
167167

168168
/**
169169
* Inject OpenTelemetry for the library runtime. Providing sdk will initiate

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
44
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
5-
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
6-
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
75
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
6+
import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver;
7+
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache;
88
import dev.openfeature.sdk.EvaluationContext;
99
import dev.openfeature.sdk.EventProvider;
1010
import dev.openfeature.sdk.Hook;
@@ -82,7 +82,7 @@ public FlagdProvider(final FlagdOptions options) {
8282
this.flagResolver = new InProcessResolver(options, this::onProviderEvent);
8383
break;
8484
case Config.RESOLVER_RPC:
85-
this.flagResolver = new GrpcResolver(
85+
this.flagResolver = new RpcResolver(
8686
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onProviderEvent);
8787
break;
8888
default:

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/GrpcConnector.java renamed to providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java

+31-75
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,7 @@
2222
* @param <K> the type of the blocking stub for the GRPC service
2323
*/
2424
@Slf4j
25-
public class GrpcConnector<T extends AbstractStub<T>, K extends AbstractBlockingStub<K>> {
26-
27-
/**
28-
* The asynchronous service stub for making non-blocking GRPC calls.
29-
*/
30-
private final T serviceStub;
25+
public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlockingStub<K>> {
3126

3227
/**
3328
* The blocking service stub for making blocking GRPC calls.
@@ -37,80 +32,51 @@ public class GrpcConnector<T extends AbstractStub<T>, K extends AbstractBlocking
3732
/**
3833
* The GRPC managed channel for managing the underlying GRPC connection.
3934
*/
35+
@Getter
4036
private final ManagedChannel channel;
4137

4238
/**
4339
* The deadline in milliseconds for GRPC operations.
4440
*/
4541
private final long deadline;
4642

47-
/**
48-
* The deadline in milliseconds for event streaming operations.
49-
*/
50-
private final long streamDeadlineMs;
51-
5243
/**
5344
* A consumer that handles connection events such as connection loss or reconnection.
5445
*/
5546
private final Consumer<FlagdProviderEvent> onConnectionEvent;
5647

5748
/**
58-
* A consumer that handles GRPC service stubs for event stream handling.
59-
*/
60-
private final Consumer<T> streamObserver;
61-
62-
private final FlagdOptions options;
63-
64-
/**
65-
* Indicates whether the connector is currently connected to the GRPC service.
66-
*/
67-
@Getter
68-
private boolean connected = false;
69-
70-
/**
71-
* Constructs a new {@code GrpcConnector} instance with the specified options and parameters.
49+
* Constructs a new {@code ChannelConnector} instance with the specified options and parameters.
7250
*
7351
* @param options the configuration options for the GRPC connection
74-
* @param stub a function to create the asynchronous service stub from a {@link ManagedChannel}
7552
* @param blockingStub a function to create the blocking service stub from a {@link ManagedChannel}
7653
* @param onConnectionEvent a consumer to handle connection events
77-
* @param eventStreamObserver a consumer to handle the event stream
7854
* @param channel the managed channel for the GRPC connection
7955
*/
80-
public GrpcConnector(
56+
public ChannelConnector(
8157
final FlagdOptions options,
82-
final Function<ManagedChannel, T> stub,
8358
final Function<ManagedChannel, K> blockingStub,
8459
final Consumer<FlagdProviderEvent> onConnectionEvent,
85-
final Consumer<T> eventStreamObserver,
8660
ManagedChannel channel) {
8761

8862
this.channel = channel;
89-
this.serviceStub = stub.apply(channel).withWaitForReady();
9063
this.blockingStubFunction = blockingStub;
9164
this.deadline = options.getDeadline();
92-
this.streamDeadlineMs = options.getStreamDeadlineMs();
9365
this.onConnectionEvent = onConnectionEvent;
94-
this.streamObserver = eventStreamObserver;
95-
this.options = options;
9666
}
9767

9868
/**
99-
* Constructs a {@code GrpcConnector} instance for testing purposes.
69+
* Constructs a {@code ChannelConnector} instance for testing purposes.
10070
*
10171
* @param options the configuration options for the GRPC connection
102-
* @param stub a function to create the asynchronous service stub from a {@link ManagedChannel}
10372
* @param blockingStub a function to create the blocking service stub from a {@link ManagedChannel}
10473
* @param onConnectionEvent a consumer to handle connection events
105-
* @param eventStreamObserver a consumer to handle the event stream
10674
*/
107-
public GrpcConnector(
75+
public ChannelConnector(
10876
final FlagdOptions options,
109-
final Function<ManagedChannel, T> stub,
11077
final Function<ManagedChannel, K> blockingStub,
111-
final Consumer<FlagdProviderEvent> onConnectionEvent,
112-
final Consumer<T> eventStreamObserver) {
113-
this(options, stub, blockingStub, onConnectionEvent, eventStreamObserver, ChannelBuilder.nettyChannel(options));
78+
final Consumer<FlagdProviderEvent> onConnectionEvent) {
79+
this(options, blockingStub, onConnectionEvent, ChannelBuilder.nettyChannel(options));
11480
}
11581

11682
/**
@@ -120,16 +86,20 @@ public GrpcConnector(
12086
*/
12187
public void initialize() throws Exception {
12288
log.info("Initializing GRPC connection...");
123-
ChannelMonitor.monitorChannelState(ConnectivityState.READY, channel, this::onReady, this::onConnectionLost);
89+
monitorChannelState(ConnectivityState.READY);
12490
}
12591

12692
/**
12793
* Returns the blocking service stub for making blocking GRPC calls.
12894
*
12995
* @return the blocking service stub
13096
*/
131-
public K getResolver() {
132-
return blockingStubFunction.apply(channel).withWaitForReady();
97+
public K getBlockingStub() {
98+
K stub = blockingStubFunction.apply(channel).withWaitForReady();
99+
if (this.deadline > 0) {
100+
stub = stub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS);
101+
}
102+
return stub;
133103
}
134104

135105
/**
@@ -147,39 +117,25 @@ public void shutdown() throws InterruptedException {
147117
}
148118

149119
/**
150-
* Handles the event when the GRPC channel becomes ready, marking the connection as established.
151-
* Cancels any pending reconnection task and restarts the event stream.
152-
*/
153-
private synchronized void onReady() {
154-
connected = true;
155-
restartStream();
156-
}
157-
158-
/**
159-
* Handles the event when the GRPC channel loses its connection, marking the connection as lost.
160-
* Schedules a reconnection task after a grace period and emits a stale connection event.
120+
* Monitors the state of a gRPC channel and triggers the specified callbacks based on state changes.
121+
*
122+
* @param expectedState the initial state to monitor.
161123
*/
162-
private synchronized void onConnectionLost() {
163-
connected = false;
164-
165-
this.onConnectionEvent.accept(new FlagdProviderEvent(
166-
ProviderEvent.PROVIDER_ERROR, Collections.emptyList(), new ImmutableStructure()));
124+
private void monitorChannelState(ConnectivityState expectedState) {
125+
channel.notifyWhenStateChanged(expectedState, this::onStateChange);
167126
}
168127

169-
/**
170-
* Restarts the event stream using the asynchronous service stub, applying a deadline if configured.
171-
* Emits a connection event if the restart is successful.
172-
*/
173-
private synchronized void restartStream() {
174-
if (connected) {
175-
log.debug("(Re)initializing event stream.");
176-
T localServiceStub = this.serviceStub;
177-
if (streamDeadlineMs > 0) {
178-
localServiceStub = localServiceStub.withDeadlineAfter(this.streamDeadlineMs, TimeUnit.MILLISECONDS);
179-
}
180-
streamObserver.accept(localServiceStub);
181-
return;
128+
private void onStateChange() {
129+
ConnectivityState currentState = channel.getState(true);
130+
log.debug("Channel state changed to: {}", currentState);
131+
if (currentState == ConnectivityState.TRANSIENT_FAILURE || currentState == ConnectivityState.SHUTDOWN) {
132+
this.onConnectionEvent.accept(new FlagdProviderEvent(
133+
ProviderEvent.PROVIDER_ERROR, Collections.emptyList(), new ImmutableStructure()));
134+
}
135+
if (currentState != ConnectivityState.SHUTDOWN) {
136+
log.debug("continuing to monitor the grpc channel");
137+
// Re-register the state monitor to watch for the next state transition.
138+
monitorChannelState(currentState);
182139
}
183-
log.debug("Stream restart skipped. Not connected.");
184140
}
185141
}

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java

-96
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package dev.openfeature.contrib.providers.flagd.resolver.common;
2+
3+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
4+
import io.grpc.stub.StreamObserver;
5+
import java.util.concurrent.BlockingQueue;
6+
import lombok.extern.slf4j.Slf4j;
7+
8+
/**
9+
* Observes gRPC streams for events and enqueues them.
10+
*/
11+
@Slf4j
12+
@SuppressFBWarnings(
13+
value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"},
14+
justification = "Internal class")
15+
public class QueueingStreamObserver<T> implements StreamObserver<T> {
16+
private final BlockingQueue<StreamResponseModel<T>> blockingQueue;
17+
18+
public QueueingStreamObserver(final BlockingQueue<StreamResponseModel<T>> queue) {
19+
blockingQueue = queue;
20+
}
21+
22+
@Override
23+
public void onNext(T response) {
24+
if (!blockingQueue.offer(new StreamResponseModel<T>(response))) {
25+
log.warn("failed to write sync response to queue");
26+
}
27+
}
28+
29+
@Override
30+
public void onError(Throwable throwable) {
31+
if (!blockingQueue.offer(new StreamResponseModel<T>(throwable))) {
32+
log.warn("failed to write error response to queue");
33+
}
34+
}
35+
36+
@Override
37+
public void onCompleted() {
38+
if (!blockingQueue.offer(new StreamResponseModel<T>(true))) {
39+
log.warn("failed to write complete status to queue");
40+
}
41+
}
42+
}

0 commit comments

Comments
 (0)