Skip to content

Commit 245e9ed

Browse files
authored
fix: transient error log-spam, add retry policy (#1273)
Signed-off-by: Todd Baert <[email protected]>
1 parent 831c410 commit 245e9ed

File tree

12 files changed

+259
-95
lines changed

12 files changed

+259
-95
lines changed

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ private void onError() {
271271
errorTask = errorExecutor.schedule(
272272
() -> {
273273
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) {
274-
log.debug(
274+
log.error(
275275
"Provider did not reconnect successfully within {}s. Emitting ERROR event...",
276276
gracePeriod);
277277
flagResolver.onError();

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java

+69-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
66
import io.grpc.ManagedChannel;
77
import io.grpc.NameResolverRegistry;
8+
import io.grpc.Status.Code;
89
import io.grpc.netty.GrpcSslContexts;
910
import io.grpc.netty.NettyChannelBuilder;
1011
import io.netty.channel.epoll.Epoll;
@@ -15,12 +16,75 @@
1516
import java.io.File;
1617
import java.net.URI;
1718
import java.net.URISyntaxException;
19+
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.Map;
1822
import java.util.concurrent.TimeUnit;
1923
import javax.net.ssl.SSLException;
2024

2125
/** gRPC channel builder helper. */
2226
public class ChannelBuilder {
2327

28+
/**
29+
* Controls retry (not-reconnection) policy for failed RPCs.
30+
*/
31+
@SuppressWarnings({"unchecked", "rawtypes"})
32+
static final Map<String, ?> SERVICE_CONFIG_WITH_RETRY = new HashMap() {
33+
{
34+
put("methodConfig", Arrays.asList(new HashMap() {
35+
{
36+
put(
37+
"name",
38+
Arrays.asList(
39+
new HashMap() {
40+
{
41+
put("service", "flagd.sync.v1.FlagSyncService");
42+
}
43+
},
44+
new HashMap() {
45+
{
46+
put("service", "flagd.evaluation.v1.Service");
47+
}
48+
}));
49+
put("retryPolicy", new HashMap() {
50+
{
51+
// 1 + 2 + 4
52+
put("maxAttempts", 3.0); // types used here are important, need to be doubles
53+
put("initialBackoff", "1s");
54+
put("maxBackoff", "5s");
55+
put("backoffMultiplier", 2.0);
56+
// status codes to retry on:
57+
put(
58+
"retryableStatusCodes",
59+
Arrays.asList(
60+
/*
61+
* All codes are retryable except OK, CANCELLED and DEADLINE_EXCEEDED since
62+
* any others not listed here cause a very tight loop of retries.
63+
* CANCELLED is not retryable because it is a client-side termination.
64+
* DEADLINE_EXCEEDED is typically a result of a client specified deadline,
65+
* and definitionally should not result in a tight loop (it's a timeout).
66+
*/
67+
Code.UNKNOWN.toString(),
68+
Code.INVALID_ARGUMENT.toString(),
69+
Code.NOT_FOUND.toString(),
70+
Code.ALREADY_EXISTS.toString(),
71+
Code.PERMISSION_DENIED.toString(),
72+
Code.RESOURCE_EXHAUSTED.toString(),
73+
Code.FAILED_PRECONDITION.toString(),
74+
Code.ABORTED.toString(),
75+
Code.OUT_OF_RANGE.toString(),
76+
Code.UNIMPLEMENTED.toString(),
77+
Code.INTERNAL.toString(),
78+
Code.UNAVAILABLE.toString(),
79+
Code.DATA_LOSS.toString(),
80+
Code.UNAUTHENTICATED.toString()));
81+
}
82+
});
83+
}
84+
}));
85+
}
86+
};
87+
2488
private ChannelBuilder() {}
2589

2690
/**
@@ -45,6 +109,8 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
45109
.eventLoopGroup(new EpollEventLoopGroup())
46110
.channelType(EpollDomainSocketChannel.class)
47111
.usePlaintext()
112+
.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
113+
.enableRetry()
48114
.build();
49115
}
50116

@@ -89,7 +155,9 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
89155
builder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry()));
90156
}
91157

92-
return builder.build();
158+
return builder.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
159+
.enableRetry()
160+
.build();
93161
} catch (SSLException ssle) {
94162
SslConfigException sslConfigException = new SslConfigException("Error with SSL configuration.");
95163
sslConfigException.initCause(ssle);

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java

+3-31
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import java.util.Collections;
1111
import java.util.concurrent.TimeUnit;
1212
import java.util.function.Consumer;
13-
import java.util.function.Function;
1413
import lombok.Getter;
1514
import lombok.extern.slf4j.Slf4j;
1615

@@ -24,11 +23,6 @@
2423
@Slf4j
2524
public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlockingStub<K>> {
2625

27-
/**
28-
* The blocking service stub for making blocking GRPC calls.
29-
*/
30-
private final Function<ManagedChannel, K> blockingStubFunction;
31-
3226
/**
3327
* The GRPC managed channel for managing the underlying GRPC connection.
3428
*/
@@ -49,18 +43,13 @@ public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlock
4943
* Constructs a new {@code ChannelConnector} instance with the specified options and parameters.
5044
*
5145
* @param options the configuration options for the GRPC connection
52-
* @param blockingStub a function to create the blocking service stub from a {@link ManagedChannel}
5346
* @param onConnectionEvent a consumer to handle connection events
5447
* @param channel the managed channel for the GRPC connection
5548
*/
5649
public ChannelConnector(
57-
final FlagdOptions options,
58-
final Function<ManagedChannel, K> blockingStub,
59-
final Consumer<FlagdProviderEvent> onConnectionEvent,
60-
ManagedChannel channel) {
50+
final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent, ManagedChannel channel) {
6151

6252
this.channel = channel;
63-
this.blockingStubFunction = blockingStub;
6453
this.deadline = options.getDeadline();
6554
this.onConnectionEvent = onConnectionEvent;
6655
}
@@ -69,14 +58,10 @@ public ChannelConnector(
6958
* Constructs a {@code ChannelConnector} instance for testing purposes.
7059
*
7160
* @param options the configuration options for the GRPC connection
72-
* @param blockingStub a function to create the blocking service stub from a {@link ManagedChannel}
7361
* @param onConnectionEvent a consumer to handle connection events
7462
*/
75-
public ChannelConnector(
76-
final FlagdOptions options,
77-
final Function<ManagedChannel, K> blockingStub,
78-
final Consumer<FlagdProviderEvent> onConnectionEvent) {
79-
this(options, blockingStub, onConnectionEvent, ChannelBuilder.nettyChannel(options));
63+
public ChannelConnector(final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent) {
64+
this(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
8065
}
8166

8267
/**
@@ -89,19 +74,6 @@ public void initialize() throws Exception {
8974
monitorChannelState(ConnectivityState.READY);
9075
}
9176

92-
/**
93-
* Returns the blocking service stub for making blocking GRPC calls.
94-
*
95-
* @return the blocking service stub
96-
*/
97-
public K getBlockingStub() {
98-
K stub = blockingStubFunction.apply(channel).withWaitForReady();
99-
if (this.deadline > 0) {
100-
stub = stub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS);
101-
}
102-
return stub;
103-
}
104-
10577
/**
10678
* Shuts down the GRPC connection and cleans up associated resources.
10779
*

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -127,19 +127,19 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
127127
}
128128
if (!stateBlockingQueue.offer(
129129
new StorageStateChange(StorageState.OK, changedFlagsKeys, metadata))) {
130-
log.warn("Failed to convey OK satus, queue is full");
130+
log.warn("Failed to convey OK status, queue is full");
131131
}
132132
} catch (Throwable e) {
133133
// catch all exceptions and avoid stream listener interruptions
134134
log.warn("Invalid flag sync payload from connector", e);
135135
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
136-
log.warn("Failed to convey STALE satus, queue is full");
136+
log.warn("Failed to convey STALE status, queue is full");
137137
}
138138
}
139139
break;
140140
case ERROR:
141141
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) {
142-
log.warn("Failed to convey ERROR satus, queue is full");
142+
log.warn("Failed to convey ERROR status, queue is full");
143143
}
144144
break;
145145
default:

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class SyncStreamQueueSource implements QueueSource {
3737

3838
private final AtomicBoolean shutdown = new AtomicBoolean(false);
3939
private final int streamDeadline;
40+
private final int deadline;
4041
private final String selector;
4142
private final String providerId;
4243
private final boolean syncMetadataDisabled;
@@ -45,30 +46,37 @@ public class SyncStreamQueueSource implements QueueSource {
4546
new LinkedBlockingQueue<>(QUEUE_SIZE);
4647
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
4748
private final FlagSyncServiceStub stub;
49+
private final FlagSyncServiceBlockingStub blockingStub;
4850

4951
/**
5052
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
5153
*/
5254
public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
5355
streamDeadline = options.getStreamDeadlineMs();
56+
deadline = options.getDeadline();
5457
selector = options.getSelector();
5558
providerId = options.getProviderId();
5659
syncMetadataDisabled = options.isSyncMetadataDisabled();
57-
channelConnector = new ChannelConnector<>(options, FlagSyncServiceGrpc::newBlockingStub, onConnectionEvent);
60+
channelConnector = new ChannelConnector<>(options, onConnectionEvent);
5861
this.stub = FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
62+
this.blockingStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
63+
.withWaitForReady();
5964
}
6065

6166
// internal use only
6267
protected SyncStreamQueueSource(
6368
final FlagdOptions options,
6469
ChannelConnector<FlagSyncServiceStub, FlagSyncServiceBlockingStub> connectorMock,
65-
FlagSyncServiceStub stubMock) {
70+
FlagSyncServiceStub stubMock,
71+
FlagSyncServiceBlockingStub blockingStubMock) {
6672
streamDeadline = options.getStreamDeadlineMs();
73+
deadline = options.getDeadline();
6774
selector = options.getSelector();
6875
providerId = options.getProviderId();
6976
channelConnector = connectorMock;
7077
stub = stubMock;
7178
syncMetadataDisabled = options.isSyncMetadataDisabled();
79+
blockingStub = blockingStubMock;
7280
}
7381

7482
/** Initialize sync stream connector. */
@@ -110,6 +118,7 @@ private void observeSyncStream() throws InterruptedException {
110118
log.info("Initializing sync stream observer");
111119

112120
// outer loop for re-issuing the stream request
121+
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
113122
while (!shutdown.get()) {
114123

115124
log.debug("Initializing sync stream request");
@@ -124,15 +133,21 @@ private void observeSyncStream() throws InterruptedException {
124133
// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
125134
if (!syncMetadataDisabled) {
126135
try {
127-
metadataResponse = channelConnector.getBlockingStub().getMetadata(metadataRequest.build());
136+
FlagSyncServiceBlockingStub localStub = blockingStub;
137+
138+
if (deadline > 0) {
139+
localStub = localStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS);
140+
}
141+
142+
metadataResponse = localStub.getMetadata(metadataRequest.build());
128143
} catch (Exception metaEx) {
129144
log.error("Metadata exception: {}, cancelling stream", metaEx.getMessage(), metaEx);
130145
context.cancel(metaEx);
131146
}
132147
}
133148

134149
// inner loop for handling messages
135-
while (!shutdown.get()) {
150+
while (!shutdown.get() && !Context.current().isCancelled()) {
136151
final StreamResponseModel<SyncFlagsResponse> taken = incomingQueue.take();
137152
if (taken.isComplete()) {
138153
log.debug("Sync stream completed, will reconnect");

Diff for: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java

+20-9
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public final class RpcResolver implements Resolver {
6666
private final LinkedBlockingQueue<StreamResponseModel<EventStreamResponse>> incomingQueue;
6767
private final Consumer<FlagdProviderEvent> onProviderEvent;
6868
private final ServiceStub stub;
69+
private final ServiceBlockingStub blockingStub;
6970

7071
/**
7172
* Resolves flag values using
@@ -82,9 +83,11 @@ public RpcResolver(
8283
this.strategy = ResolveFactory.getStrategy(options);
8384
this.options = options;
8485
incomingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
85-
this.connector = new ChannelConnector<>(options, ServiceGrpc::newBlockingStub, onProviderEvent);
86+
this.connector = new ChannelConnector<>(options, onProviderEvent);
8687
this.onProviderEvent = onProviderEvent;
8788
this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady();
89+
this.blockingStub =
90+
ServiceGrpc.newBlockingStub(this.connector.getChannel()).withWaitForReady();
8891
}
8992

9093
// testing only
@@ -93,6 +96,7 @@ protected RpcResolver(
9396
final Cache cache,
9497
final Consumer<FlagdProviderEvent> onProviderEvent,
9598
ServiceStub mockStub,
99+
ServiceBlockingStub mockBlockingStub,
96100
ChannelConnector<ServiceStub, ServiceBlockingStub> connector) {
97101
this.cache = cache;
98102
this.strategy = ResolveFactory.getStrategy(options);
@@ -101,6 +105,7 @@ protected RpcResolver(
101105
this.connector = connector;
102106
this.onProviderEvent = onProviderEvent;
103107
this.stub = mockStub;
108+
this.blockingStub = mockBlockingStub;
104109
}
105110

106111
/**
@@ -145,15 +150,15 @@ public void onError() {
145150
public ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx) {
146151
ResolveBooleanRequest request = ResolveBooleanRequest.newBuilder().buildPartial();
147152

148-
return resolve(key, ctx, request, getResolver()::resolveBoolean, null);
153+
return resolve(key, ctx, request, getBlockingStub()::resolveBoolean, null);
149154
}
150155

151156
/**
152157
* String evaluation from grpc resolver.
153158
*/
154159
public ProviderEvaluation<String> stringEvaluation(String key, String defaultValue, EvaluationContext ctx) {
155160
ResolveStringRequest request = ResolveStringRequest.newBuilder().buildPartial();
156-
return resolve(key, ctx, request, getResolver()::resolveString, null);
161+
return resolve(key, ctx, request, getBlockingStub()::resolveString, null);
157162
}
158163

159164
/**
@@ -162,7 +167,7 @@ public ProviderEvaluation<String> stringEvaluation(String key, String defaultVal
162167
public ProviderEvaluation<Double> doubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) {
163168
ResolveFloatRequest request = ResolveFloatRequest.newBuilder().buildPartial();
164169

165-
return resolve(key, ctx, request, getResolver()::resolveFloat, null);
170+
return resolve(key, ctx, request, getBlockingStub()::resolveFloat, null);
166171
}
167172

168173
/**
@@ -172,11 +177,17 @@ public ProviderEvaluation<Integer> integerEvaluation(String key, Integer default
172177

173178
ResolveIntRequest request = ResolveIntRequest.newBuilder().buildPartial();
174179

175-
return resolve(key, ctx, request, getResolver()::resolveInt, (Object value) -> ((Long) value).intValue());
180+
return resolve(key, ctx, request, getBlockingStub()::resolveInt, (Object value) -> ((Long) value).intValue());
176181
}
177182

178-
private ServiceGrpc.ServiceBlockingStub getResolver() {
179-
return connector.getBlockingStub().withDeadlineAfter(options.getDeadline(), TimeUnit.MILLISECONDS);
183+
private ServiceGrpc.ServiceBlockingStub getBlockingStub() {
184+
ServiceBlockingStub localStub = blockingStub;
185+
186+
if (options.getDeadline() > 0) {
187+
localStub = localStub.withDeadlineAfter(options.getDeadline(), TimeUnit.MILLISECONDS);
188+
}
189+
190+
return localStub;
180191
}
181192

182193
/**
@@ -190,7 +201,7 @@ public ProviderEvaluation<Value> objectEvaluation(String key, Value defaultValue
190201
key,
191202
ctx,
192203
request,
193-
getResolver()::resolveObject,
204+
getBlockingStub()::resolveObject,
194205
(Object value) -> convertObjectResponse((com.google.protobuf.Struct) value));
195206
}
196207

@@ -321,11 +332,11 @@ private void observeEventStream() throws InterruptedException {
321332
log.info("Initializing event stream observer");
322333

323334
// outer loop for re-issuing the stream request
335+
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
324336
while (!shutdown.get()) {
325337

326338
log.debug("Initializing event stream request");
327339
restartStream();
328-
329340
// inner loop for handling messages
330341
while (!shutdown.get()) {
331342
final StreamResponseModel<EventStreamResponse> taken = incomingQueue.take();

0 commit comments

Comments
 (0)