Skip to content

Commit 9de03df

Browse files
guidobreitoddbaert
andauthored
feat: Add GRPC stream connection deadline (#999)
Signed-off-by: Guido Breitenhuber <[email protected]> Signed-off-by: Todd Baert <[email protected]> Co-authored-by: Todd Baert <[email protected]>
1 parent 27543c9 commit 9de03df

File tree

8 files changed

+200
-40
lines changed

8 files changed

+200
-40
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public final class Config {
1717
static final String DEFAULT_HOST = "localhost";
1818

1919
static final int DEFAULT_DEADLINE = 500;
20+
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
2021
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
2122
static final long DEFAULT_KEEP_ALIVE = 0;
2223

@@ -31,6 +32,7 @@ public final class Config {
3132
static final String MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME = "FLAGD_MAX_EVENT_STREAM_RETRIES";
3233
static final String BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME = "FLAGD_RETRY_BACKOFF_MS";
3334
static final String DEADLINE_MS_ENV_VAR_NAME = "FLAGD_DEADLINE_MS";
35+
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
3436
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
3537
static final String OFFLINE_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH";
3638
static final String KEEP_ALIVE_MS_ENV_VAR_NAME_OLD = "FLAGD_KEEP_ALIVE_TIME";

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ public class FlagdOptions {
9292
@Builder.Default
9393
private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE);
9494

95+
/**
96+
* Streaming connection deadline in milliseconds.
97+
* Set to 0 to disable the deadline.
98+
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
99+
*/
100+
@Builder.Default
101+
private int streamDeadlineMs = fallBackToEnvOrDefault(Config.STREAM_DEADLINE_MS_ENV_VAR_NAME,
102+
Config.DEFAULT_STREAM_DEADLINE_MS);
103+
95104
/**
96105
* Selector to be used with flag sync gRPC contract.
97106
**/
@@ -101,7 +110,7 @@ public class FlagdOptions {
101110
/**
102111
* gRPC client KeepAlive in milliseconds. Disabled with 0.
103112
* Defaults to 0 (disabled).
104-
*
113+
*
105114
**/
106115
@Builder.Default
107116
private long keepAlive = fallBackToEnvOrDefault(Config.KEEP_ALIVE_MS_ENV_VAR_NAME,

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
1212
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
1313
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14+
import io.grpc.Status.Code;
15+
import io.grpc.StatusRuntimeException;
1416
import io.grpc.stub.StreamObserver;
1517
import lombok.extern.slf4j.Slf4j;
1618

@@ -52,12 +54,18 @@ public void onNext(EventStreamResponse value) {
5254
}
5355

5456
@Override
55-
public void onError(Throwable t) {
56-
log.warn("event stream", t);
57-
if (this.cache.getEnabled()) {
58-
this.cache.clear();
57+
public void onError(Throwable throwable) {
58+
if (throwable instanceof StatusRuntimeException
59+
&& ((StatusRuntimeException) throwable).getStatus().getCode()
60+
.equals(Code.DEADLINE_EXCEEDED)) {
61+
log.debug(String.format("stream deadline reached; will re-establish"));
62+
} else {
63+
log.error(String.format("event stream error", throwable));
64+
if (this.cache.getEnabled()) {
65+
this.cache.clear();
66+
}
67+
this.onConnectionEvent.accept(false, Collections.emptyList());
5968
}
60-
this.onConnectionEvent.accept(false, Collections.emptyList());
6169

6270
// handle last call of this stream
6371
handleEndOfStream();

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class GrpcConnector {
3636

3737
private final int startEventStreamRetryBackoff;
3838
private final long deadline;
39+
private final long streamDeadlineMs;
3940

4041
private final Cache cache;
4142
private final Consumer<ConnectionEvent> onConnectionEvent;
@@ -64,6 +65,7 @@ public GrpcConnector(final FlagdOptions options, final Cache cache, final Suppli
6465
this.startEventStreamRetryBackoff = options.getRetryBackoffMs();
6566
this.eventStreamRetryBackoff = options.getRetryBackoffMs();
6667
this.deadline = options.getDeadline();
68+
this.streamDeadlineMs = options.getStreamDeadlineMs();
6769
this.cache = cache;
6870
this.onConnectionEvent = onConnectionEvent;
6971
this.connectedSupplier = connectedSupplier;
@@ -126,7 +128,14 @@ private void observeEventStream() {
126128
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
127129
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
128130
this::onConnectionEvent);
129-
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);
131+
132+
ServiceGrpc.ServiceStub localServiceStub = this.serviceStub;
133+
134+
if (this.streamDeadlineMs > 0) {
135+
localServiceStub = localServiceStub.withDeadlineAfter(this.streamDeadlineMs, TimeUnit.MILLISECONDS);
136+
}
137+
138+
localServiceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);
130139

131140
try {
132141
synchronized (sync) {

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java

+30-11
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.grpc.Context;
2323
import io.grpc.Context.CancellableContext;
2424
import io.grpc.ManagedChannel;
25+
import io.grpc.Status.Code;
26+
import io.grpc.StatusRuntimeException;
2527
import lombok.extern.slf4j.Slf4j;
2628

2729
/**
@@ -43,6 +45,7 @@ public class GrpcStreamConnector implements Connector {
4345
private final FlagSyncServiceStub serviceStub;
4446
private final FlagSyncServiceBlockingStub serviceBlockingStub;
4547
private final int deadline;
48+
private final int streamDeadlineMs;
4649
private final String selector;
4750

4851
/**
@@ -55,6 +58,7 @@ public GrpcStreamConnector(final FlagdOptions options) {
5558
serviceStub = FlagSyncServiceGrpc.newStub(channel);
5659
serviceBlockingStub = FlagSyncServiceGrpc.newBlockingStub(channel);
5760
deadline = options.getDeadline();
61+
streamDeadlineMs = options.getStreamDeadlineMs();
5862
selector = options.getSelector();
5963
}
6064

@@ -64,7 +68,8 @@ public GrpcStreamConnector(final FlagdOptions options) {
6468
public void init() {
6569
Thread listener = new Thread(() -> {
6670
try {
67-
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline);
71+
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline,
72+
streamDeadlineMs);
6873
} catch (InterruptedException e) {
6974
log.warn("gRPC event stream interrupted, flag configurations are stale", e);
7075
Thread.currentThread().interrupt();
@@ -114,7 +119,8 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
114119
final FlagSyncServiceStub serviceStub,
115120
final FlagSyncServiceBlockingStub serviceBlockingStub,
116121
final String selector,
117-
final int deadline)
122+
final int deadline,
123+
final int streamDeadlineMs)
118124
throws InterruptedException {
119125

120126
final BlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE);
@@ -128,14 +134,20 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
128134
log.debug("Initializing sync stream request");
129135
final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder();
130136
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
131-
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();
137+
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();
132138

133139
if (selector != null) {
134140
syncRequest.setSelector(selector);
135141
}
136142

137143
try (CancellableContext context = Context.current().withCancellation()) {
138-
serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
144+
FlagSyncServiceStub localServiceStub = serviceStub;
145+
if (streamDeadlineMs > 0) {
146+
localServiceStub = localServiceStub.withDeadlineAfter(streamDeadlineMs, TimeUnit.MILLISECONDS);
147+
}
148+
149+
localServiceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
150+
139151
try {
140152
metadataResponse = serviceBlockingStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS)
141153
.getMetadata(metadataRequest.build());
@@ -158,14 +170,21 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
158170
}
159171

160172
if (response.getError() != null || metadataException != null) {
161-
log.error(String.format("Error from initializing stream or metadata, retrying in %dms",
162-
retryDelay), response.getError());
163-
164-
if (!writeTo.offer(
165-
new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata",
166-
metadataResponse))) {
167-
log.error("Failed to convey ERROR status, queue is full");
173+
if (response.getError() instanceof StatusRuntimeException
174+
&& ((StatusRuntimeException) response.getError()).getStatus().getCode()
175+
.equals(Code.DEADLINE_EXCEEDED)) {
176+
log.debug(String.format("Stream deadline reached, re-establishing in %dms",
177+
retryDelay));
178+
} else {
179+
log.error(String.format("Error initializing stream or metadata, retrying in %dms",
180+
retryDelay), response.getError());
181+
if (!writeTo.offer(
182+
new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata",
183+
metadataResponse))) {
184+
log.error("Failed to convey ERROR status, queue is full");
185+
}
168186
}
187+
169188
// close the context to cancel the stream in case just the metadata call failed
170189
context.cancel(metadataException);
171190
break;

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserverTest.java

+12
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static org.mockito.Mockito.atLeast;
88
import static org.mockito.Mockito.atMost;
99
import static org.mockito.Mockito.mock;
10+
import static org.mockito.Mockito.never;
1011
import static org.mockito.Mockito.times;
1112
import static org.mockito.Mockito.verify;
1213
import static org.mockito.Mockito.when;
@@ -24,6 +25,8 @@
2425

2526
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
2627
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
28+
import io.grpc.Status;
29+
import io.grpc.StatusRuntimeException;
2730

2831
class EventStreamObserverTest {
2932

@@ -83,6 +86,15 @@ public void reconnections() {
8386
assertFalse(states.get(0));
8487
}
8588

89+
@Test
90+
public void deadlineExceeded() {
91+
stream.onError(new StatusRuntimeException(Status.DEADLINE_EXCEEDED));
92+
// we flush the cache
93+
verify(cache, never()).clear();
94+
// we notify the error
95+
assertEquals(0, states.size());
96+
}
97+
8698
@Test
8799
public void cacheBustingForKnownKeys() {
88100
final String key1 = "myKey1";

0 commit comments

Comments
 (0)