Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 59b6474

Browse files
committedSep 24, 2024·
feat: call and expose sync-metadata
Signed-off-by: Todd Baert <[email protected]>
1 parent 7d66ca8 commit 59b6474

File tree

20 files changed

+730
-518
lines changed

20 files changed

+730
-518
lines changed
 

‎providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package dev.openfeature.contrib.providers.flagd;
22

3+
import java.util.Collections;
34
import java.util.List;
5+
import java.util.Map;
46

57
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
68
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
@@ -24,6 +26,7 @@ public class FlagdProvider extends EventProvider {
2426
private final Resolver flagResolver;
2527
private volatile boolean initialized = false;
2628
private volatile boolean connected = false;
29+
private volatile Map<String, Object> syncMetadata = Collections.emptyMap();
2730

2831
private EvaluationContext evaluationContext;
2932

@@ -47,13 +50,13 @@ public FlagdProvider(final FlagdOptions options) {
4750
switch (options.getResolverType().asString()) {
4851
case Config.RESOLVER_IN_PROCESS:
4952
this.flagResolver = new InProcessResolver(options, this::isConnected,
50-
this::onResolverConnectionChanged);
53+
this::onConnectionEvent);
5154
break;
5255
case Config.RESOLVER_RPC:
5356
this.flagResolver = new GrpcResolver(options,
5457
new Cache(options.getCacheType(), options.getMaxCacheSize()),
5558
this::isConnected,
56-
this::onResolverConnectionChanged);
59+
this::onConnectionEvent);
5760
break;
5861
default:
5962
throw new IllegalStateException(
@@ -117,6 +120,19 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
117120
return this.flagResolver.objectEvaluation(key, defaultValue, mergeContext(ctx));
118121
}
119122

123+
/**
124+
* An unmodifiable view of an object map representing the latest result of the
125+
* SyncMetadata.
126+
* Set on initial connection and updated with every reconnection.
127+
* see:
128+
* https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata
129+
*
130+
* @return Object map representing sync metadata
131+
*/
132+
protected Map<String, Object> getSyncMetadata() {
133+
return Collections.unmodifiableMap(syncMetadata);
134+
}
135+
120136
private EvaluationContext mergeContext(final EvaluationContext clientCallCtx) {
121137
if (this.evaluationContext != null) {
122138
return evaluationContext.merge(clientCallCtx);
@@ -129,10 +145,12 @@ private boolean isConnected() {
129145
return this.connected;
130146
}
131147

132-
private void onResolverConnectionChanged(boolean newConnectedState, List<String> changedFlagKeys) {
148+
private void onConnectionEvent(boolean newConnectedState, List<String> changedFlagKeys,
149+
Map<String, Object> syncMetadata) {
133150
boolean previous = connected;
134151
boolean current = newConnectedState;
135152
this.connected = newConnectedState;
153+
this.syncMetadata = syncMetadata;
136154

137155
// configuration changed
138156
if (initialized && previous && current) {

‎providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/Resolver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import dev.openfeature.sdk.Value;
66

77
/**
8-
* A generic flag resolving contract for flagd.
8+
* Abstraction that resolves flag values in from some source.
99
*/
1010
public interface Resolver {
1111
void init() throws Exception;

‎providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,25 @@
2020
@Slf4j
2121
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
2222
class EventStreamObserver implements StreamObserver<EventStreamResponse> {
23-
private final BiConsumer<Boolean, List<String>> stateConsumer;
23+
private final BiConsumer<Boolean, List<String>> onConnectionEvent;
2424
private final Object sync;
2525
private final Cache cache;
2626

27-
private static final String CONFIGURATION_CHANGE = "configuration_change";
28-
private static final String PROVIDER_READY = "provider_ready";
27+
public static final String CONFIGURATION_CHANGE = "configuration_change";
28+
public static final String PROVIDER_READY = "provider_ready";
2929
static final String FLAGS_KEY = "flags";
3030

3131
/**
3232
* Create a gRPC stream that get notified about flag changes.
3333
*
3434
* @param sync synchronization object from caller
3535
* @param cache cache to update
36-
* @param stateConsumer lambda to call for setting the state
36+
* @param onResponse lambda to call to handle the response
3737
*/
38-
EventStreamObserver(Object sync, Cache cache, BiConsumer<Boolean, List<String>> stateConsumer) {
38+
EventStreamObserver(Object sync, Cache cache, BiConsumer<Boolean, List<String>> onResponse) {
3939
this.sync = sync;
4040
this.cache = cache;
41-
this.stateConsumer = stateConsumer;
41+
this.onConnectionEvent = onResponse;
4242
}
4343

4444
@Override
@@ -61,7 +61,7 @@ public void onError(Throwable t) {
6161
if (this.cache.getEnabled()) {
6262
this.cache.clear();
6363
}
64-
this.stateConsumer.accept(false, Collections.emptyList());
64+
this.onConnectionEvent.accept(false, Collections.emptyList());
6565

6666
// handle last call of this stream
6767
handleEndOfStream();
@@ -72,7 +72,7 @@ public void onCompleted() {
7272
if (this.cache.getEnabled()) {
7373
this.cache.clear();
7474
}
75-
this.stateConsumer.accept(false, Collections.emptyList());
75+
this.onConnectionEvent.accept(false, Collections.emptyList());
7676

7777
// handle last call of this stream
7878
handleEndOfStream();
@@ -99,11 +99,11 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) {
9999
}
100100
}
101101

102-
this.stateConsumer.accept(true, changedFlags);
102+
this.onConnectionEvent.accept(true, changedFlags);
103103
}
104104

105105
private void handleProviderReadyEvent() {
106-
this.stateConsumer.accept(true, Collections.emptyList());
106+
this.onConnectionEvent.accept(true, Collections.emptyList());
107107
if (this.cache.getEnabled()) {
108108
this.cache.clear();
109109
}

‎providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java

+15-14
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import java.util.Collections;
44
import java.util.List;
5+
import java.util.Map;
56
import java.util.Random;
67
import java.util.concurrent.TimeUnit;
7-
import java.util.function.BiConsumer;
88
import java.util.function.Supplier;
99

1010
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
@@ -14,6 +14,7 @@
1414
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamRequest;
1515
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
1616
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc;
17+
import dev.openfeature.sdk.internal.TriConsumer;
1718
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1819
import io.grpc.ManagedChannel;
1920
import io.grpc.stub.StreamObserver;
@@ -37,7 +38,7 @@ public class GrpcConnector {
3738
private final long deadline;
3839

3940
private final Cache cache;
40-
private final BiConsumer<Boolean, List<String>> stateConsumer;
41+
private final TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent;
4142
private final Supplier<Boolean> connectedSupplier;
4243

4344
private int eventStreamAttempt = 1;
@@ -48,23 +49,23 @@ public class GrpcConnector {
4849

4950
/**
5051
* GrpcConnector creates an abstraction over gRPC communication.
51-
*
52-
* @param options options to build the gRPC channel.
53-
* @param cache cache to use.
54-
* @param stateConsumer lambda to call for setting the state.
52+
*
53+
* @param options flagd options
54+
* @param cache cache to use
55+
* @param connectedSupplier lambda providing current connection status from caller
56+
* @param onConnectionEvent lambda which handles changes in the connection/stream
5557
*/
5658
public GrpcConnector(final FlagdOptions options, final Cache cache, final Supplier<Boolean> connectedSupplier,
57-
BiConsumer<Boolean, List<String>> stateConsumer) {
59+
TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent) {
5860
this.channel = ChannelBuilder.nettyChannel(options);
5961
this.serviceStub = ServiceGrpc.newStub(channel);
6062
this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel);
61-
6263
this.maxEventStreamRetries = options.getMaxEventStreamRetries();
6364
this.startEventStreamRetryBackoff = options.getRetryBackoffMs();
6465
this.eventStreamRetryBackoff = options.getRetryBackoffMs();
6566
this.deadline = options.getDeadline();
6667
this.cache = cache;
67-
this.stateConsumer = stateConsumer;
68+
this.onConnectionEvent = onConnectionEvent;
6869
this.connectedSupplier = connectedSupplier;
6970
}
7071

@@ -104,7 +105,7 @@ public void shutdown() throws Exception {
104105
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
105106
log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
106107
}
107-
this.stateConsumer.accept(false, Collections.emptyList());
108+
this.onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap());
108109
}
109110
}
110111

@@ -124,7 +125,7 @@ public ServiceGrpc.ServiceBlockingStub getResolver() {
124125
private void observeEventStream() {
125126
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
126127
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
127-
this::grpcStateConsumer);
128+
this::grpconConnectionEvent);
128129
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);
129130

130131
try {
@@ -155,16 +156,16 @@ private void observeEventStream() {
155156
}
156157

157158
log.error("failed to connect to event stream, exhausted retries");
158-
this.grpcStateConsumer(false, null);
159+
this.grpconConnectionEvent(false, Collections.emptyList());
159160
}
160161

161-
private void grpcStateConsumer(final boolean connected, final List<String> changedFlags) {
162+
private void grpconConnectionEvent(final boolean connected, final List<String> changedFlags) {
162163
// reset reconnection states
163164
if (connected) {
164165
this.eventStreamAttempt = 1;
165166
this.eventStreamRetryBackoff = this.startEventStreamRetryBackoff;
166167
}
167168
// chain to initiator
168-
this.stateConsumer.accept(connected, changedFlags);
169+
this.onConnectionEvent.accept(connected, changedFlags, Collections.emptyMap());
169170
}
170171
}

‎providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java

+32-22
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import java.util.HashMap;
44
import java.util.List;
55
import java.util.Map;
6-
import java.util.function.BiConsumer;
76
import java.util.function.Function;
87
import java.util.function.Supplier;
98
import java.util.stream.Collectors;
@@ -29,18 +28,21 @@
2928
import dev.openfeature.sdk.ImmutableMetadata;
3029
import dev.openfeature.sdk.MutableStructure;
3130
import dev.openfeature.sdk.ProviderEvaluation;
31+
import dev.openfeature.sdk.Structure;
3232
import dev.openfeature.sdk.Value;
3333
import dev.openfeature.sdk.exceptions.FlagNotFoundError;
3434
import dev.openfeature.sdk.exceptions.GeneralError;
3535
import dev.openfeature.sdk.exceptions.OpenFeatureError;
3636
import dev.openfeature.sdk.exceptions.ParseError;
3737
import dev.openfeature.sdk.exceptions.TypeMismatchError;
38+
import dev.openfeature.sdk.internal.TriConsumer;
3839
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
3940
import io.grpc.Status.Code;
4041
import io.grpc.StatusRuntimeException;
4142

4243
/**
43-
* FlagResolution resolves flags from flagd.
44+
* Resolves flag values using https://buf.build/open-feature/flagd/docs/main:flagd.evaluation.v1.
45+
* Flags are evaluated remotely.
4446
*/
4547
@SuppressWarnings("PMD.TooManyStaticImports")
4648
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
@@ -52,20 +54,20 @@ public final class GrpcResolver implements Resolver {
5254
private final Supplier<Boolean> connectedSupplier;
5355

5456
/**
55-
* Initialize Grpc resolver.
56-
*
57-
* @param options flagd options.
58-
* @param cache cache to use.
59-
* @param connectedSupplier lambda to call for getting the state.
60-
* @param onResolverConnectionChanged lambda to communicate back the state.
57+
* Resolves flag values using https://buf.build/open-feature/flagd/docs/main:flagd.evaluation.v1.
58+
* Flags are evaluated remotely.
59+
*
60+
* @param options flagd options
61+
* @param cache cache to use
62+
* @param connectedSupplier lambda providing current connection status from caller
63+
* @param onConnectionEvent lambda which handles changes in the connection/stream
6164
*/
6265
public GrpcResolver(final FlagdOptions options, final Cache cache, final Supplier<Boolean> connectedSupplier,
63-
final BiConsumer<Boolean, List<String>> onResolverConnectionChanged) {
66+
final TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent) {
6467
this.cache = cache;
6568
this.connectedSupplier = connectedSupplier;
66-
6769
this.strategy = ResolveFactory.getStrategy(options);
68-
this.connector = new GrpcConnector(options, cache, connectedSupplier, onResolverConnectionChanged);
70+
this.connector = new GrpcConnector(options, cache, connectedSupplier, onConnectionEvent);
6971
}
7072

7173
/**
@@ -203,14 +205,14 @@ private Boolean cacheAvailable() {
203205
/**
204206
* Recursively convert protobuf structure to openfeature value.
205207
*/
206-
private static Value convertObjectResponse(Struct protobuf) {
208+
public static Value convertObjectResponse(Struct protobuf) {
207209
return convertProtobufMap(protobuf.getFieldsMap());
208210
}
209211

210212
/**
211213
* Recursively convert the Evaluation context to a protobuf structure.
212214
*/
213-
private static Struct convertContext(EvaluationContext ctx) {
215+
public static Struct convertContext(EvaluationContext ctx) {
214216
Map<String, Value> ctxMap = ctx.asMap();
215217
// asMap() does not provide explicitly set targeting key (ex:- new
216218
// ImmutableContext("TargetingKey") ).
@@ -223,7 +225,7 @@ private static Struct convertContext(EvaluationContext ctx) {
223225
/**
224226
* Convert any openfeature value to a protobuf value.
225227
*/
226-
private static com.google.protobuf.Value convertAny(Value value) {
228+
public static com.google.protobuf.Value convertAny(Value value) {
227229
if (value.isList()) {
228230
return convertList(value.asList());
229231
} else if (value.isStructure()) {
@@ -236,7 +238,7 @@ private static com.google.protobuf.Value convertAny(Value value) {
236238
/**
237239
* Convert any protobuf value to {@link Value}.
238240
*/
239-
private static Value convertAny(com.google.protobuf.Value protobuf) {
241+
public static Value convertAny(com.google.protobuf.Value protobuf) {
240242
if (protobuf.hasListValue()) {
241243
return convertList(protobuf.getListValue());
242244
} else if (protobuf.hasStructValue()) {
@@ -249,7 +251,7 @@ private static Value convertAny(com.google.protobuf.Value protobuf) {
249251
/**
250252
* Convert OpenFeature map to protobuf {@link com.google.protobuf.Value}.
251253
*/
252-
private static com.google.protobuf.Value convertMap(Map<String, Value> map) {
254+
public static com.google.protobuf.Value convertMap(Map<String, Value> map) {
253255
Map<String, com.google.protobuf.Value> values = new HashMap<>();
254256

255257
map.keySet().forEach((String key) -> {
@@ -265,20 +267,28 @@ private static com.google.protobuf.Value convertMap(Map<String, Value> map) {
265267
* Convert protobuf map with {@link com.google.protobuf.Value} to OpenFeature
266268
* map.
267269
*/
268-
private static Value convertProtobufMap(Map<String, com.google.protobuf.Value> map) {
270+
public static Value convertProtobufMap(Map<String, com.google.protobuf.Value> map) {
271+
return new Value(convertProtobufMapToStructure(map));
272+
}
273+
274+
/**
275+
* Convert protobuf map with {@link com.google.protobuf.Value} to OpenFeature
276+
* map.
277+
*/
278+
public static Structure convertProtobufMapToStructure(Map<String, com.google.protobuf.Value> map) {
269279
Map<String, Value> values = new HashMap<>();
270280

271281
map.keySet().forEach((String key) -> {
272282
com.google.protobuf.Value value = map.get(key);
273283
values.put(key, convertAny(value));
274284
});
275-
return new Value(new MutableStructure(values));
285+
return new MutableStructure(values);
276286
}
277287

278288
/**
279289
* Convert OpenFeature list to protobuf {@link com.google.protobuf.Value}.
280290
*/
281-
private static com.google.protobuf.Value convertList(List<Value> values) {
291+
public static com.google.protobuf.Value convertList(List<Value> values) {
282292
ListValue list = ListValue.newBuilder()
283293
.addAllValues(values.stream()
284294
.map(v -> convertAny(v)).collect(Collectors.toList()))
@@ -289,15 +299,15 @@ private static com.google.protobuf.Value convertList(List<Value> values) {
289299
/**
290300
* Convert protobuf list to OpenFeature {@link com.google.protobuf.Value}.
291301
*/
292-
private static Value convertList(ListValue protobuf) {
302+
public static Value convertList(ListValue protobuf) {
293303
return new Value(protobuf.getValuesList().stream().map(p -> convertAny(p)).collect(Collectors.toList()));
294304
}
295305

296306
/**
297307
* Convert OpenFeature {@link Value} to protobuf
298308
* {@link com.google.protobuf.Value}.
299309
*/
300-
private static com.google.protobuf.Value convertPrimitive(Value value) {
310+
public static com.google.protobuf.Value convertPrimitive(Value value) {
301311
com.google.protobuf.Value.Builder builder = com.google.protobuf.Value.newBuilder();
302312

303313
if (value.isBoolean()) {
@@ -316,7 +326,7 @@ private static com.google.protobuf.Value convertPrimitive(Value value) {
316326
* Convert protobuf {@link com.google.protobuf.Value} to OpenFeature
317327
* {@link Value}.
318328
*/
319-
private static Value convertPrimitive(com.google.protobuf.Value protobuf) {
329+
public static Value convertPrimitive(com.google.protobuf.Value protobuf) {
320330
final Value value;
321331
if (protobuf.hasBoolValue()) {
322332
value = new Value(protobuf.getBoolValue());

‎providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

+22-14
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22

33
import static dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag.EMPTY_TARGETING_STRING;
44

5+
import java.util.Collections;
56
import java.util.List;
6-
import java.util.function.BiConsumer;
7+
import java.util.Map;
78
import java.util.function.Supplier;
89

910
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
@@ -26,33 +27,39 @@
2627
import dev.openfeature.sdk.Value;
2728
import dev.openfeature.sdk.exceptions.ParseError;
2829
import dev.openfeature.sdk.exceptions.TypeMismatchError;
30+
import dev.openfeature.sdk.internal.TriConsumer;
2931
import lombok.extern.slf4j.Slf4j;
3032

3133
/**
32-
* flagd in-process resolver. Resolves feature flags in-process. Flags are
33-
* retrieved from {@link Storage}, where the
34-
* {@link Storage} maintain flag configurations obtained from known source.
34+
* Resolves flag values using
35+
* https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1.
36+
* Flags are evaluated locally.
3537
*/
3638
@Slf4j
3739
public class InProcessResolver implements Resolver {
3840
private final Storage flagStore;
39-
private final BiConsumer<Boolean, List<String>> onResolverConnectionChanged;
41+
private final TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent;
4042
private final Operator operator;
4143
private final long deadline;
4244
private final ImmutableMetadata metadata;
4345
private final Supplier<Boolean> connectedSupplier;
4446

4547
/**
46-
* Initialize an in-process resolver.
47-
* @param options flagd options
48-
* @param connectedSupplier supplier for connection state
49-
* @param onResolverConnectionChanged handler for connection change
48+
* Resolves flag values using
49+
* https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1.
50+
* Flags are evaluated locally.
51+
*
52+
* @param options flagd options
53+
* @param connectedSupplier lambda providing current connection status from
54+
* caller
55+
* @param onConnectionEvent lambda which handles changes in the
56+
* connection/stream
5057
*/
5158
public InProcessResolver(FlagdOptions options, final Supplier<Boolean> connectedSupplier,
52-
BiConsumer<Boolean, List<String>> onResolverConnectionChanged) {
59+
TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent) {
5360
this.flagStore = new FlagStore(getConnector(options));
5461
this.deadline = options.getDeadline();
55-
this.onResolverConnectionChanged = onResolverConnectionChanged;
62+
this.onConnectionEvent = onConnectionEvent;
5663
this.operator = new Operator();
5764
this.connectedSupplier = connectedSupplier;
5865
this.metadata = options.getSelector() == null ? null
@@ -72,10 +79,11 @@ public void init() throws Exception {
7279
final StorageStateChange storageStateChange = flagStore.getStateQueue().take();
7380
switch (storageStateChange.getStorageState()) {
7481
case OK:
75-
onResolverConnectionChanged.accept(true, storageStateChange.getChangedFlagsKeys());
82+
onConnectionEvent.accept(true, storageStateChange.getChangedFlagsKeys(),
83+
storageStateChange.getSyncMetadata());
7684
break;
7785
case ERROR:
78-
onResolverConnectionChanged.accept(false, null);
86+
onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap());
7987
break;
8088
default:
8189
log.info(String.format("Storage emitted unhandled status: %s",
@@ -101,7 +109,7 @@ public void init() throws Exception {
101109
*/
102110
public void shutdown() throws InterruptedException {
103111
flagStore.shutdown();
104-
onResolverConnectionChanged.accept(false, null);
112+
onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap());
105113
}
106114

107115
/**

‎providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
44
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
55
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
6-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
6+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
77
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
88
import lombok.extern.slf4j.Slf4j;
99
import java.util.HashMap;
@@ -94,15 +94,15 @@ public BlockingQueue<StorageStateChange> getStateQueue() {
9494
}
9595

9696
private void streamerListener(final Connector connector) throws InterruptedException {
97-
final BlockingQueue<StreamPayload> streamPayloads = connector.getStream();
97+
final BlockingQueue<QueuePayload> streamPayloads = connector.getStream();
9898

9999
while (!shutdown.get()) {
100-
final StreamPayload take = streamPayloads.take();
100+
final QueuePayload take = streamPayloads.take();
101101
switch (take.getType()) {
102102
case DATA:
103103
try {
104104
List<String> changedFlagsKeys;
105-
Map<String, FeatureFlag> flagMap = FlagParser.parseString(take.getData(), throwIfInvalid);
105+
Map<String, FeatureFlag> flagMap = FlagParser.parseString(take.getFlagData(), throwIfInvalid);
106106
writeLock.lock();
107107
try {
108108
changedFlagsKeys = getChangedFlagsKeys(flagMap);
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;
22

3+
import java.util.Collections;
4+
import java.util.List;
5+
import java.util.Map;
6+
37
import lombok.EqualsAndHashCode;
48
import lombok.Getter;
59
import lombok.ToString;
610

7-
import java.util.ArrayList;
8-
import java.util.Collections;
9-
import java.util.List;
10-
1111
/**
1212
* Represents a change in the stored flags.
1313
*/
@@ -17,14 +17,39 @@
1717
public class StorageStateChange {
1818
private final StorageState storageState;
1919
private final List<String> changedFlagsKeys;
20+
private final Map<String, Object> syncMetadata;
21+
22+
/**
23+
* Construct a new StorageStateChange.
24+
* @param storageState state of the storage
25+
* @param changedFlagsKeys flags changed
26+
* @param syncMetadata possibly updated metadata
27+
*/
28+
public StorageStateChange(StorageState storageState, List<String> changedFlagsKeys,
29+
Map<String, Object> syncMetadata) {
30+
this.storageState = storageState;
31+
this.changedFlagsKeys = Collections.unmodifiableList(changedFlagsKeys);
32+
this.syncMetadata = Collections.unmodifiableMap(syncMetadata);
33+
}
2034

35+
/**
36+
* Construct a new StorageStateChange.
37+
* @param storageState state of the storage
38+
* @param changedFlagsKeys flags changed
39+
*/
2140
public StorageStateChange(StorageState storageState, List<String> changedFlagsKeys) {
2241
this.storageState = storageState;
23-
this.changedFlagsKeys = new ArrayList<>(changedFlagsKeys);
42+
this.changedFlagsKeys = Collections.unmodifiableList(changedFlagsKeys);
43+
this.syncMetadata = Collections.emptyMap();
2444
}
2545

46+
/**
47+
* Construct a new StorageStateChange.
48+
* @param storageState state of the storage
49+
*/
2650
public StorageStateChange(StorageState storageState) {
2751
this.storageState = storageState;
2852
this.changedFlagsKeys = Collections.emptyList();
53+
this.syncMetadata = Collections.emptyMap();
2954
}
3055
}

‎providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/Connector.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44

55
/**
66
* Contract of the in-process storage connector. Connectors are responsible to stream flag configurations in
7-
* {@link StreamPayload} format.
7+
* {@link QueuePayload} format.
88
*/
99
public interface Connector {
1010
void init() throws Exception;
1111

12-
BlockingQueue<StreamPayload> getStream();
12+
BlockingQueue<QueuePayload> getStream();
1313

1414
void shutdown() throws InterruptedException;
1515
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector;
22

3+
import java.util.Map;
34
import lombok.AllArgsConstructor;
45
import lombok.Getter;
56

@@ -8,7 +9,8 @@
89
*/
910
@AllArgsConstructor
1011
@Getter
11-
public class StreamPayload {
12-
private final StreamPayloadType type;
13-
private final String data;
12+
public class QueuePayload {
13+
private final QueuePayloadType type;
14+
private final String flagData;
15+
private final Map<String, Object> syncMetadata;
1416
}
+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
/**
44
* Payload type emitted by {@link Connector}.
55
*/
6-
public enum StreamPayloadType {
6+
public enum QueuePayloadType {
77
DATA,
88
ERROR
99
}

‎providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnector.java

+12-11
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file;
22

3-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
4-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
5-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
6-
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
7-
import lombok.extern.slf4j.Slf4j;
8-
93
import java.io.IOException;
104
import java.nio.charset.StandardCharsets;
115
import java.nio.file.Files;
126
import java.nio.file.Path;
137
import java.nio.file.Paths;
8+
import java.util.Collections;
149
import java.util.concurrent.BlockingQueue;
1510
import java.util.concurrent.LinkedBlockingQueue;
1611

12+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
13+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
14+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
15+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16+
import lombok.extern.slf4j.Slf4j;
17+
1718
/**
1819
* File connector reads flag configurations from a given file, polls for changes and expose the content through
1920
* {@code Connector} contract.
@@ -28,7 +29,7 @@ public class FileConnector implements Connector {
2829
private static final String OFFER_WARN = "Unable to offer file content to queue: queue is full";
2930

3031
private final String flagSourcePath;
31-
private final BlockingQueue<StreamPayload> queue = new LinkedBlockingQueue<>(1);
32+
private final BlockingQueue<QueuePayload> queue = new LinkedBlockingQueue<>(1);
3233
private boolean shutdown = false;
3334

3435
public FileConnector(final String flagSourcePath) {
@@ -45,7 +46,7 @@ public void init() throws IOException {
4546

4647
// initial read
4748
String flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8);
48-
if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) {
49+
if (!queue.offer(new QueuePayload(QueuePayloadType.DATA, flagData, Collections.emptyMap()))) {
4950
log.warn(OFFER_WARN);
5051
}
5152

@@ -58,7 +59,7 @@ public void init() throws IOException {
5859
if (currentTS > lastTS) {
5960
lastTS = currentTS;
6061
flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8);
61-
if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) {
62+
if (!queue.offer(new QueuePayload(QueuePayloadType.DATA, flagData, Collections.emptyMap()))) {
6263
log.warn(OFFER_WARN);
6364
}
6465
}
@@ -72,7 +73,7 @@ public void init() throws IOException {
7273
Thread.currentThread().interrupt();
7374
} catch (Throwable t) {
7475
log.error("Error from file connector. File connector will exit", t);
75-
if (!queue.offer(new StreamPayload(StreamPayloadType.ERROR, t.toString()))) {
76+
if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, t.toString(), null))) {
7677
log.warn(OFFER_WARN);
7778
}
7879
}
@@ -86,7 +87,7 @@ public void init() throws IOException {
8687
/**
8788
* Expose the queue to fulfil the {@code Connector} contract.
8889
*/
89-
public BlockingQueue<StreamPayload> getStream() {
90+
public BlockingQueue<QueuePayload> getStream() {
9091
return queue;
9192
}
9293

‎providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java

+55-23
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,30 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;
22

3+
import java.util.Collections;
4+
import java.util.Map;
5+
import java.util.Random;
6+
import java.util.concurrent.BlockingQueue;
7+
import java.util.concurrent.LinkedBlockingQueue;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
311
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
412
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
13+
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
514
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
6-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
7-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
15+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
16+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
817
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc;
18+
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceBlockingStub;
919
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceStub;
20+
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataRequest;
21+
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
1022
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
1123
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
1224
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1325
import io.grpc.ManagedChannel;
1426
import lombok.extern.slf4j.Slf4j;
1527

16-
import java.util.Random;
17-
import java.util.concurrent.BlockingQueue;
18-
import java.util.concurrent.LinkedBlockingQueue;
19-
import java.util.concurrent.TimeUnit;
20-
import java.util.concurrent.atomic.AtomicBoolean;
21-
2228
/**
2329
* Implements the {@link Connector} contract and emit flags obtained from flagd
2430
* sync gRPC contract.
@@ -35,10 +41,11 @@ public class GrpcStreamConnector implements Connector {
3541
private static final int QUEUE_SIZE = 5;
3642

3743
private final AtomicBoolean shutdown = new AtomicBoolean(false);
38-
private final BlockingQueue<StreamPayload> blockingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
44+
private final BlockingQueue<QueuePayload> blockingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
3945

4046
private final ManagedChannel channel;
41-
private final FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub;
47+
private final FlagSyncServiceStub serviceStub;
48+
private final FlagSyncServiceBlockingStub serviceBlockingStub;
4249
private final int deadline;
4350
private final String selector;
4451

@@ -50,6 +57,7 @@ public class GrpcStreamConnector implements Connector {
5057
public GrpcStreamConnector(final FlagdOptions options) {
5158
channel = ChannelBuilder.nettyChannel(options);
5259
serviceStub = FlagSyncServiceGrpc.newStub(channel);
60+
serviceBlockingStub = FlagSyncServiceGrpc.newBlockingStub(channel);
5361
deadline = options.getDeadline();
5462
selector = options.getSelector();
5563
}
@@ -60,13 +68,7 @@ public GrpcStreamConnector(final FlagdOptions options) {
6068
public void init() {
6169
Thread listener = new Thread(() -> {
6270
try {
63-
final SyncFlagsRequest.Builder requestBuilder = SyncFlagsRequest.newBuilder();
64-
65-
if (selector != null) {
66-
requestBuilder.setSelector(selector);
67-
}
68-
69-
observeEventStream(blockingQueue, shutdown, serviceStub, requestBuilder.build());
71+
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector);
7072
} catch (InterruptedException e) {
7173
log.warn("gRPC event stream interrupted, flag configurations are stale", e);
7274
Thread.currentThread().interrupt();
@@ -80,7 +82,7 @@ public void init() {
8082
/**
8183
* Get blocking queue to obtain payloads exposed by this connector.
8284
*/
83-
public BlockingQueue<StreamPayload> getStream() {
85+
public BlockingQueue<QueuePayload> getStream() {
8486
return blockingQueue;
8587
}
8688

@@ -111,10 +113,11 @@ public void shutdown() throws InterruptedException {
111113
/**
112114
* Contains blocking calls, to be used concurrently.
113115
*/
114-
static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
116+
static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
115117
final AtomicBoolean shutdown,
116118
final FlagSyncServiceStub serviceStub,
117-
final SyncFlagsRequest request)
119+
final FlagSyncServiceBlockingStub serviceBlockingStub,
120+
final String selector)
118121
throws InterruptedException {
119122

120123
final BlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE);
@@ -123,8 +126,28 @@ static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
123126
log.info("Initializing sync stream observer");
124127

125128
while (!shutdown.get()) {
129+
Exception metadataException = null;
126130
log.debug("Initializing sync stream request");
127-
serviceStub.syncFlags(request, new GrpcStreamHandler(streamReceiver));
131+
final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder();
132+
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
133+
Map<String, Object> metadata = Collections.emptyMap();
134+
135+
if (selector != null) {
136+
syncRequest.setSelector(selector);
137+
}
138+
139+
serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
140+
try {
141+
GetMetadataResponse metadataResponse = serviceBlockingStub.getMetadata(metadataRequest.build());
142+
metadata = GrpcResolver
143+
.convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap()).asObjectMap();
144+
} catch (Exception e) {
145+
// the chances this call fails but the syncRequest does not are slim
146+
// it could be that the server doesn't implement this RPC
147+
// instead of logging here, retain the exception and only log if the
148+
// streamReceiver doesn't error
149+
metadataException = e;
150+
}
128151

129152
while (!shutdown.get()) {
130153
final GrpcResponseModel response = streamReceiver.take();
@@ -141,7 +164,8 @@ static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
141164
response.getError());
142165

143166
if (!writeTo.offer(
144-
new StreamPayload(StreamPayloadType.ERROR, "Error from stream connection, retrying"))) {
167+
new QueuePayload(QueuePayloadType.ERROR, "Error from stream connection, retrying",
168+
metadata))) {
145169
log.error("Failed to convey ERROR status, queue is full");
146170
}
147171
break;
@@ -150,11 +174,18 @@ static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
150174
final SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse();
151175
String data = flagsResponse.getFlagConfiguration();
152176
log.debug("Got stream response: " + data);
177+
153178
if (!writeTo.offer(
154-
new StreamPayload(StreamPayloadType.DATA, data))) {
179+
new QueuePayload(QueuePayloadType.DATA, data, metadata))) {
155180
log.error("Stream writing failed");
156181
}
157182

183+
if (metadataException != null) {
184+
// if we somehow are connected but the metadata call failed, something strange
185+
// happened
186+
log.error("Stream connected but getMetadata RPC failed", metadataException);
187+
}
188+
158189
// reset retry delay if we succeeded in a retry attempt
159190
retryDelay = INIT_BACK_OFF;
160191
}
@@ -177,4 +208,5 @@ static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
177208
// log as this can happen after awakened from backoff sleep
178209
log.info("Shutdown invoked, exiting event stream listener");
179210
}
211+
180212
}

‎providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java

+51-11
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static org.mockito.Mockito.doAnswer;
1212
import static org.mockito.Mockito.doNothing;
1313
import static org.mockito.Mockito.mock;
14+
import static org.mockito.Mockito.mockConstruction;
1415
import static org.mockito.Mockito.mockStatic;
1516
import static org.mockito.Mockito.spy;
1617
import static org.mockito.Mockito.times;
@@ -20,6 +21,7 @@
2021
import java.lang.reflect.Field;
2122
import java.util.ArrayList;
2223
import java.util.Arrays;
24+
import java.util.Collections;
2325
import java.util.HashMap;
2426
import java.util.List;
2527
import java.util.Map;
@@ -31,6 +33,7 @@
3133
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateChange;
3234
import org.junit.jupiter.api.BeforeAll;
3335
import org.junit.jupiter.api.Test;
36+
import org.mockito.MockedConstruction;
3437
import org.mockito.MockedStatic;
3538
import org.mockito.Mockito;
3639

@@ -60,13 +63,16 @@
6063
import dev.openfeature.sdk.MutableContext;
6164
import dev.openfeature.sdk.MutableStructure;
6265
import dev.openfeature.sdk.OpenFeatureAPI;
66+
import dev.openfeature.sdk.ProviderEvaluation;
6367
import dev.openfeature.sdk.ProviderState;
6468
import dev.openfeature.sdk.Reason;
6569
import dev.openfeature.sdk.Structure;
6670
import dev.openfeature.sdk.Value;
71+
import dev.openfeature.sdk.internal.TriConsumer;
6772
import io.cucumber.java.AfterAll;
6873
import io.grpc.Channel;
6974
import io.grpc.Deadline;
75+
import lombok.val;
7076

7177
class FlagdProviderTest {
7278
private static final String FLAG_KEY = "some-key";
@@ -502,14 +508,14 @@ void invalidate_cache() {
502508
final Cache cache = new Cache("lru", 5);
503509

504510
class NoopInitGrpcConnector extends GrpcConnector {
505-
public NoopInitGrpcConnector(FlagdOptions options, Cache cache, Supplier<Boolean> connectedSupplier, BiConsumer<Boolean, List<String>> onResolverConnectionChanged) {
506-
super(options, cache, connectedSupplier, onResolverConnectionChanged);
511+
public NoopInitGrpcConnector(FlagdOptions options, Cache cache, Supplier<Boolean> connectedSupplier, TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent) {
512+
super(options, cache, connectedSupplier, onConnectionEvent);
507513
}
508514

509515
public void initialize() throws Exception {};
510516
}
511517

512-
grpc = new NoopInitGrpcConnector(FlagdOptions.builder().build(), cache, () -> true, (state, changedFlagKeys) -> {
518+
grpc = new NoopInitGrpcConnector(FlagdOptions.builder().build(), cache, () -> true, (state, changedFlagKeys, syncMetadata) -> {
513519
});
514520
}
515521

@@ -719,15 +725,15 @@ void disabled_cache() {
719725
class NoopInitGrpcConnector extends GrpcConnector {
720726
public NoopInitGrpcConnector(FlagdOptions options, Cache cache,
721727
Supplier<Boolean> connectedSupplier,
722-
BiConsumer<Boolean, List<String>> onResolverConnectionChanged) {
723-
super(options, cache, connectedSupplier, onResolverConnectionChanged);
728+
TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent) {
729+
super(options, cache, connectedSupplier, onConnectionEvent);
724730
}
725731

726732
public void initialize() throws Exception {
727733
};
728734
}
729735

730-
grpc = new NoopInitGrpcConnector(FlagdOptions.builder().build(), cache, () -> true, (state, changedFlagKeys) -> {
736+
grpc = new NoopInitGrpcConnector(FlagdOptions.builder().build(), cache, () -> true, (state, changedFlagKeys, syncMetadata) -> {
731737
});
732738
}
733739

@@ -845,6 +851,40 @@ void initializationAndShutdown() throws Exception {
845851
verify(resolverMock, times(1)).shutdown();
846852
}
847853

854+
@Test
855+
void updatesSyncMetadataWithCallback() throws Exception {
856+
857+
final EvaluationContext ctx = new ImmutableContext();
858+
String key = "key1";
859+
String val = "val1";
860+
Map<String, Object> metadata = new HashMap<>();
861+
metadata.put(key, val);
862+
863+
// mock a resolver
864+
try (MockedConstruction<GrpcResolver> mockResolver = mockConstruction(GrpcResolver.class,
865+
(mock, context) -> {
866+
TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent;
867+
868+
// get a reference to the onConnectionEvent callback
869+
onConnectionEvent = (TriConsumer<Boolean, List<String>, Map<String, Object>>) context
870+
.arguments().get(3);
871+
872+
// when our mock resolver initializes, it runs the passed onConnectionEvent callback
873+
doAnswer(invocation -> {
874+
onConnectionEvent.accept(true, Collections.emptyList(),
875+
metadata);
876+
return null;
877+
}).when(mock).init();
878+
})) {
879+
880+
FlagdProvider provider = new FlagdProvider();
881+
provider.initialize(ctx);
882+
883+
// the onConnectionEvent should have updated the sync metadata
884+
assertEquals(val, provider.getSyncMetadata().get(key));
885+
}
886+
}
887+
848888
// test helper
849889

850890
// create provider with given grpc connector
@@ -861,12 +901,12 @@ private FlagdProvider createProvider(GrpcConnector grpc, Supplier<Boolean> getCo
861901

862902
// create provider with given grpc provider, cache and state supplier
863903
private FlagdProvider createProvider(GrpcConnector grpc, Cache cache, Supplier<Boolean> getConnected) {
864-
final FlagdOptions flagdOptions = FlagdOptions.builder().build();
865-
final GrpcResolver grpcResolver = new GrpcResolver(flagdOptions, cache, getConnected,
866-
(providerState, changedFlagKeys) -> {
867-
});
904+
final FlagdOptions flagdOptions = FlagdOptions.builder().build();
905+
final GrpcResolver grpcResolver = new GrpcResolver(flagdOptions, cache, getConnected,
906+
(providerState, changedFlagKeys, syncMetadata) -> {
907+
});
868908

869-
final FlagdProvider provider = new FlagdProvider();
909+
final FlagdProvider provider = new FlagdProvider();
870910

871911
try {
872912
Field connector = GrpcResolver.class.getDeclaredField("connector");

‎providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java

+51-21
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
44
import static org.junit.jupiter.api.Assertions.assertEquals;
55
import static org.junit.jupiter.api.Assertions.assertThrows;
6+
import static org.junit.jupiter.api.Assertions.fail;
67
import static org.mockito.ArgumentMatchers.any;
78
import static org.mockito.ArgumentMatchers.anyInt;
89
import static org.mockito.ArgumentMatchers.anyLong;
@@ -18,6 +19,8 @@
1819
import static org.mockito.Mockito.when;
1920

2021
import java.lang.reflect.Field;
22+
import java.util.List;
23+
import java.util.Map;
2124
import java.util.concurrent.atomic.AtomicBoolean;
2225

2326
import org.junit.jupiter.api.Test;
@@ -27,12 +30,15 @@
2730
import org.junit.jupiter.params.provider.ValueSource;
2831
import org.mockito.MockedConstruction;
2932
import org.mockito.MockedStatic;
33+
import org.mockito.invocation.InvocationOnMock;
3034

3135
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
3236
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
37+
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
3338
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc;
3439
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceBlockingStub;
3540
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceStub;
41+
import dev.openfeature.sdk.internal.TriConsumer;
3642
import io.grpc.Channel;
3743
import io.grpc.netty.NettyChannelBuilder;
3844
import io.netty.channel.EventLoopGroup;
@@ -43,7 +49,7 @@
4349
public class GrpcConnectorTest {
4450

4551
@ParameterizedTest
46-
@ValueSource(ints = {1, 2, 3})
52+
@ValueSource(ints = { 1, 2, 3 })
4753
void validate_retry_calls(int retries) throws NoSuchFieldException, IllegalAccessException {
4854
final int backoffMs = 100;
4955

@@ -58,8 +64,9 @@ void validate_retry_calls(int retries) throws NoSuchFieldException, IllegalAcces
5864
final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
5965
doAnswer(invocation -> null).when(mockStub).eventStream(any(), any());
6066

61-
final GrpcConnector connector = new GrpcConnector(options, cache, () -> true, (state,changedFlagKeys) -> {
62-
});
67+
final GrpcConnector connector = new GrpcConnector(options, cache, () -> true,
68+
(state, changedFlagKeys, syncMetadata) -> {
69+
});
6370

6471
Field serviceStubField = GrpcConnector.class.getDeclaredField("serviceStub");
6572
serviceStubField.setAccessible(true);
@@ -90,29 +97,52 @@ void validate_retry_calls(int retries) throws NoSuchFieldException, IllegalAcces
9097
@Test
9198
void initialization_succeed_with_connected_status() throws NoSuchFieldException, IllegalAccessException {
9299
final Cache cache = new Cache("disabled", 0);
93-
94100
final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
95-
doAnswer(invocation -> null).when(mockStub).eventStream(any(), any());
101+
TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent = mock(TriConsumer.class);
102+
doAnswer((InvocationOnMock invocation) -> {
103+
EventStreamObserver eventStreamObserver = (EventStreamObserver) invocation.getArgument(1);
104+
eventStreamObserver
105+
.onNext(EventStreamResponse.newBuilder().setType(EventStreamObserver.PROVIDER_READY).build());
106+
return null;
107+
}).when(mockStub).eventStream(any(), any());
96108

97-
// pass true in connected lambda
98-
final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> true, (state, changedFlagKeys) -> {
99-
});
109+
try (MockedStatic<ServiceGrpc> mockStaticService = mockStatic(ServiceGrpc.class)) {
110+
mockStaticService.when(() -> ServiceGrpc.newStub(any()))
111+
.thenReturn(mockStub);
100112

101-
assertDoesNotThrow(connector::initialize);
113+
// pass true in connected lambda
114+
final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> {
115+
try {
116+
Thread.sleep(100);
117+
return true;
118+
} catch (Exception e) {
119+
}
120+
return false;
121+
122+
},
123+
onConnectionEvent);
124+
125+
assertDoesNotThrow(connector::initialize);
126+
127+
// assert that onConnectionEvent was called with true
128+
verify(onConnectionEvent).accept(argThat(arg -> arg), any(), any());
129+
}
102130
}
103131

104132
@Test
105133
void initialization_fail_with_timeout() throws Exception {
106134
final Cache cache = new Cache("disabled", 0);
107-
108135
final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
136+
TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent = mock(TriConsumer.class);
109137
doAnswer(invocation -> null).when(mockStub).eventStream(any(), any());
110138

111-
// pass false in connected lambda
112-
final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> false, (state, changedFlagKeys) -> {
113-
});
139+
final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> false,
140+
onConnectionEvent);
114141

142+
// assert throws
115143
assertThrows(RuntimeException.class, connector::initialize);
144+
// assert that onConnectionEvent was called with false
145+
verify(onConnectionEvent).accept(argThat(arg -> !arg), any(), any());
116146
}
117147

118148
@Test
@@ -170,17 +200,16 @@ void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
170200
new GrpcConnector(FlagdOptions.builder().build(), null, null, null);
171201

172202
// verify host/port matches & called times(= 1 as we rely on reusable channel)
173-
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder.
174-
forAddress(host, port), times(1));
203+
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder.forAddress(host, port), times(1));
175204
}
176205
}
177206
});
178207
}
179208

180-
181209
/**
182-
* OS Specific test - This test is valid only on Linux system as it rely on epoll availability
183-
* */
210+
* OS Specific test - This test is valid only on Linux system as it rely on
211+
* epoll availability
212+
*/
184213
@Test
185214
@EnabledOnOs(OS.LINUX)
186215
void path_arg_should_build_domain_socket_with_correct_path() {
@@ -218,8 +247,9 @@ void path_arg_should_build_domain_socket_with_correct_path() {
218247
}
219248

220249
/**
221-
* OS Specific test - This test is valid only on Linux system as it rely on epoll availability
222-
* */
250+
* OS Specific test - This test is valid only on Linux system as it rely on
251+
* epoll availability
252+
*/
223253
@Test
224254
@EnabledOnOs(OS.LINUX)
225255
void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Exception {
@@ -249,7 +279,7 @@ void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Ex
249279

250280
new GrpcConnector(FlagdOptions.builder().build(), null, null, null);
251281

252-
//verify path matches & called times(= 1 as we rely on reusable channel)
282+
// verify path matches & called times(= 1 as we rely on reusable channel)
253283
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
254284
.forAddress(argThat((DomainSocketAddress d) -> {
255285
return d.path() == path;

‎providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java

+343-334
Large diffs are not rendered by default.

‎providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22

33
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
44
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
5-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
6-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
5+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
6+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
77
import org.junit.Assert;
88
import org.junit.jupiter.api.Test;
99

1010
import java.time.Duration;
1111
import java.util.HashSet;
1212
import java.util.Map;
13+
import java.util.Collections;
1314
import java.util.concurrent.BlockingQueue;
1415
import java.util.concurrent.LinkedBlockingQueue;
1516
import java.util.stream.Collectors;
@@ -27,15 +28,15 @@ class FlagStoreTest {
2728
public void connectorHandling() throws Exception {
2829
final int maxDelay = 500;
2930

30-
final BlockingQueue<StreamPayload> payload = new LinkedBlockingQueue<>();
31+
final BlockingQueue<QueuePayload> payload = new LinkedBlockingQueue<>();
3132
FlagStore store = new FlagStore(new MockConnector(payload), true);
3233

3334
store.init();
3435
final BlockingQueue<StorageStateChange> states = store.getStateQueue();
3536

3637
// OK for simple flag
3738
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> {
38-
payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
39+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE), Collections.emptyMap()));
3940
});
4041

4142
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> {
@@ -44,7 +45,7 @@ public void connectorHandling() throws Exception {
4445

4546
// STALE for invalid flag
4647
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> {
47-
payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(INVALID_FLAG)));
48+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(INVALID_FLAG), Collections.emptyMap()));
4849
});
4950

5051
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> {
@@ -53,7 +54,7 @@ public void connectorHandling() throws Exception {
5354

5455
// OK again for next payload
5556
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> {
56-
payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_LONG)));
57+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), Collections.emptyMap()));
5758
});
5859

5960
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> {
@@ -62,7 +63,7 @@ public void connectorHandling() throws Exception {
6263

6364
// ERROR is propagated correctly
6465
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> {
65-
payload.offer(new StreamPayload(StreamPayloadType.ERROR, null));
66+
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null, Collections.emptyMap()));
6667
});
6768

6869
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> {
@@ -80,21 +81,21 @@ public void connectorHandling() throws Exception {
8081
@Test
8182
public void changedFlags() throws Exception {
8283
final int maxDelay = 500;
83-
final BlockingQueue<StreamPayload> payload = new LinkedBlockingQueue<>();
84+
final BlockingQueue<QueuePayload> payload = new LinkedBlockingQueue<>();
8485
FlagStore store = new FlagStore(new MockConnector(payload), true);
8586
store.init();
8687
final BlockingQueue<StorageStateChange> storageStateDTOS = store.getStateQueue();
8788

8889
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> {
89-
payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
90+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE), Collections.emptyMap()));
9091
});
9192
// flags changed for first time
9293
assertEquals(FlagParser.parseString(
9394
getFlagsFromResource(VALID_SIMPLE), true).keySet().stream().collect(Collectors.toList()),
9495
storageStateDTOS.take().getChangedFlagsKeys());
9596

9697
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> {
97-
payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_LONG)));
98+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), Collections.emptyMap()));
9899
});
99100
Map<String, FeatureFlag> expectedChangedFlags =
100101
FlagParser.parseString(getFlagsFromResource(VALID_LONG),true);

‎providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,33 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;
22

33
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
4-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
5-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
4+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
5+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
66
import lombok.extern.slf4j.Slf4j;
77

88
import java.util.concurrent.BlockingQueue;
9+
import java.util.Collections;
910

1011
@Slf4j
1112
public class MockConnector implements Connector {
1213

13-
private BlockingQueue<StreamPayload> mockQueue;
14+
private BlockingQueue<QueuePayload> mockQueue;
1415

15-
public MockConnector(final BlockingQueue<StreamPayload> mockQueue) {
16+
public MockConnector(final BlockingQueue<QueuePayload> mockQueue) {
1617
this.mockQueue = mockQueue;
1718
}
1819

1920
public void init() {
2021
// no-op
2122
}
2223

23-
public BlockingQueue<StreamPayload> getStream() {
24+
public BlockingQueue<QueuePayload> getStream() {
2425
return mockQueue;
2526
}
2627

2728
public void shutdown() {
2829
// Emit error mocking closed connection scenario
29-
if (!mockQueue.offer(new StreamPayload(StreamPayloadType.ERROR, "shutdown invoked"))) {
30+
if (!mockQueue.offer(new QueuePayload(QueuePayloadType.ERROR, "shutdown invoked", Collections.emptyMap()))) {
3031
log.warn("Failed to offer shutdown status");
3132
}
3233
}

‎providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnectorTest.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file;
22

3-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
4-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
3+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
4+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
55
import org.junit.jupiter.api.Disabled;
66
import org.junit.jupiter.api.Test;
77

@@ -31,16 +31,16 @@ void readAndExposeFeatureFlagsFromSource() throws IOException {
3131
connector.init();
3232

3333
// then
34-
final BlockingQueue<StreamPayload> stream = connector.getStream();
35-
final StreamPayload[] payload = new StreamPayload[1];
34+
final BlockingQueue<QueuePayload> stream = connector.getStream();
35+
final QueuePayload[] payload = new QueuePayload[1];
3636

3737
assertNotNull(stream);
3838
assertTimeoutPreemptively(Duration.ofMillis(200), () -> {
3939
payload[0] = stream.take();
4040
});
4141

42-
assertNotNull(payload[0].getData());
43-
assertEquals(StreamPayloadType.DATA, payload[0].getType());
42+
assertNotNull(payload[0].getFlagData());
43+
assertEquals(QueuePayloadType.DATA, payload[0].getType());
4444
}
4545

4646
@Test
@@ -52,16 +52,16 @@ void emitErrorStateForInvalidPath() throws IOException {
5252
connector.init();
5353

5454
// then
55-
final BlockingQueue<StreamPayload> stream = connector.getStream();
55+
final BlockingQueue<QueuePayload> stream = connector.getStream();
5656

5757
// Must emit an error within considerable time
58-
final StreamPayload[] payload = new StreamPayload[1];
58+
final QueuePayload[] payload = new QueuePayload[1];
5959
assertTimeoutPreemptively(Duration.ofMillis(200), () -> {
6060
payload[0] = stream.take();
6161
});
6262

63-
assertNotNull(payload[0].getData());
64-
assertEquals(StreamPayloadType.ERROR, payload[0].getType());
63+
assertNotNull(payload[0].getFlagData());
64+
assertEquals(QueuePayloadType.ERROR, payload[0].getType());
6565
}
6666

6767
@Test
@@ -80,15 +80,15 @@ void watchForFileUpdatesAndEmitThem() throws IOException {
8080
connector.init();
8181

8282
// then
83-
final BlockingQueue<StreamPayload> stream = connector.getStream();
84-
final StreamPayload[] payload = new StreamPayload[1];
83+
final BlockingQueue<QueuePayload> stream = connector.getStream();
84+
final QueuePayload[] payload = new QueuePayload[1];
8585

8686
// first validate the initial payload
8787
assertTimeoutPreemptively(Duration.ofMillis(200), () -> {
8888
payload[0] = stream.take();
8989
});
9090

91-
assertEquals(initial, payload[0].getData());
91+
assertEquals(initial, payload[0].getFlagData());
9292

9393
// then update the flags
9494
Files.write(updPath, updatedFlags.getBytes(), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
@@ -98,7 +98,7 @@ void watchForFileUpdatesAndEmitThem() throws IOException {
9898
payload[0] = stream.take();
9999
});
100100

101-
assertEquals(updatedFlags, payload[0].getData());
101+
assertEquals(updatedFlags, payload[0].getFlagData());
102102
}
103103

104104
}

‎providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java

+43-9
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;
22

3+
import static org.junit.Assert.assertThat;
34
import static org.junit.jupiter.api.Assertions.assertEquals;
45
import static org.junit.jupiter.api.Assertions.assertNotNull;
56
import static org.junit.jupiter.api.Assertions.assertNull;
67
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
8+
import static org.junit.jupiter.api.Assertions.assertTrue;
79
import static org.mockito.ArgumentMatchers.any;
810
import static org.mockito.Mockito.times;
911
import static org.mockito.Mockito.verify;
12+
import static org.mockito.Mockito.when;
1013

1114
import java.lang.reflect.Field;
1215
import java.time.Duration;
@@ -16,10 +19,14 @@
1619
import org.junit.jupiter.api.Test;
1720
import org.mockito.Mockito;
1821

22+
import com.google.protobuf.Struct;
23+
1924
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
20-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
21-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
25+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
26+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
27+
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceBlockingStub;
2228
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceStub;
29+
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
2330
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
2431
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
2532

@@ -57,8 +64,19 @@ public void connectionParameters() throws Throwable {
5764
@Test
5865
public void grpcConnectionStatus() throws Throwable {
5966
// given
67+
final String key = "key1";
68+
final String val = "value1";
6069
final GrpcStreamConnector connector = new GrpcStreamConnector(FlagdOptions.builder().build());
6170
final FlagSyncServiceStub stubMock = mockStubAndReturn(connector);
71+
final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector);
72+
73+
final Struct metadata = Struct.newBuilder()
74+
.putFields(key,
75+
com.google.protobuf.Value.newBuilder().setStringValue(val).build())
76+
.build();
77+
78+
79+
when(blockingStubMock.getMetadata(any())).thenReturn(GetMetadataResponse.newBuilder().setMetadata(metadata).build());
6280

6381
final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1];
6482

@@ -71,21 +89,23 @@ public void grpcConnectionStatus() throws Throwable {
7189
connector.init();
7290
// verify and wait for initialization
7391
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
92+
verify(blockingStubMock).getMetadata(any());
7493

7594
// then
7695
final GrpcStreamHandler grpcStreamHandler = injectedHandler[0];
7796
assertNotNull(grpcStreamHandler);
7897

79-
final BlockingQueue<StreamPayload> streamPayloads = connector.getStream();
98+
final BlockingQueue<QueuePayload> streamPayloads = connector.getStream();
8099

81100
// accepted data
82101
grpcStreamHandler.onNext(
83102
SyncFlagsResponse.newBuilder()
84103
.build());
85104

86105
assertTimeoutPreemptively(MAX_WAIT_MS, () -> {
87-
StreamPayload payload = streamPayloads.take();
88-
assertEquals(StreamPayloadType.DATA, payload.getType());
106+
QueuePayload payload = streamPayloads.take();
107+
assertEquals(QueuePayloadType.DATA, payload.getType());
108+
assertTrue(() -> payload.getSyncMetadata().get(key).equals(val));
89109
});
90110

91111
// ping must be ignored
@@ -99,8 +119,8 @@ public void grpcConnectionStatus() throws Throwable {
99119
.build());
100120

101121
assertTimeoutPreemptively(MAX_WAIT_MS, () -> {
102-
StreamPayload payload = streamPayloads.take();
103-
assertEquals(StreamPayloadType.DATA, payload.getType());
122+
QueuePayload payload = streamPayloads.take();
123+
assertEquals(QueuePayloadType.DATA, payload.getType());
104124
});
105125
}
106126

@@ -109,6 +129,7 @@ public void listenerExitOnShutdown() throws Throwable {
109129
// given
110130
final GrpcStreamConnector connector = new GrpcStreamConnector(FlagdOptions.builder().build());
111131
final FlagSyncServiceStub stubMock = mockStubAndReturn(connector);
132+
final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector);
112133

113134
final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1];
114135

@@ -121,6 +142,7 @@ public void listenerExitOnShutdown() throws Throwable {
121142
connector.init();
122143
// verify and wait for initialization
123144
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
145+
verify(blockingStubMock).getMetadata(any());
124146

125147
// then
126148
final GrpcStreamHandler grpcStreamHandler = injectedHandler[0];
@@ -132,8 +154,8 @@ public void listenerExitOnShutdown() throws Throwable {
132154
grpcStreamHandler.onError(new Exception("Channel closed, exiting"));
133155

134156
assertTimeoutPreemptively(MAX_WAIT_MS, () -> {
135-
StreamPayload payload = connector.getStream().take();
136-
assertEquals(StreamPayloadType.ERROR, payload.getType());
157+
QueuePayload payload = connector.getStream().take();
158+
assertEquals(QueuePayloadType.ERROR, payload.getType());
137159
});
138160

139161
// Validate mock calls & no more event propagation
@@ -161,4 +183,16 @@ private static FlagSyncServiceStub mockStubAndReturn(final GrpcStreamConnector c
161183
return stubMock;
162184
}
163185

186+
private static FlagSyncServiceBlockingStub mockBlockingStubAndReturn(final GrpcStreamConnector connector)
187+
throws Throwable {
188+
final Field blockingStubField = GrpcStreamConnector.class.getDeclaredField("serviceBlockingStub");
189+
blockingStubField.setAccessible(true);
190+
191+
final FlagSyncServiceBlockingStub blockingStubMock = Mockito.mock(FlagSyncServiceBlockingStub.class);
192+
193+
blockingStubField.set(connector, blockingStubMock);
194+
195+
return blockingStubMock;
196+
}
197+
164198
}

0 commit comments

Comments
 (0)
Please sign in to comment.