Skip to content

feat: flagd in-process evalator improvements #451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ variables.
Given below are the supported configurations:

| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
| --------------------- | ------------------------------ | ---------------------- | --------- | ------------------- |
|-----------------------|--------------------------------|------------------------|-----------|---------------------|
| host | FLAGD_HOST | String | localhost | RPC & in-process |
| port | FLAGD_PORT | int | 8013 | RPC & in-process |
| tls | FLAGD_TLS | boolean | false | RPC & in-process |
| socketPath | FLAGD_SOCKET_PATH | String | null | RPC & in-process |
| certPath | FLAGD_SERVER_CERT_PATH | String | null | RPC & in-process |
| deadline | FLAGD_DEADLINE_MS | int | 500 | RPC & in-process |
| selector | FLAGD_SELECTOR | String | null | in-process |
| cache | FLAGD_CACHE | String - lru, disabled | lru | RPC |
| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | RPC |
| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | RPC |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public final class Config {
static final String MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME = "FLAGD_MAX_EVENT_STREAM_RETRIES";
static final String BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME = "FLAGD_RETRY_BACKOFF_MS";
static final String DEADLINE_MS_ENV_VAR_NAME = "FLAGD_DEADLINE_MS";
static final String SELECTOR_ENV_VAR_NAME = "FLAGD_SELECTOR";

public static final String STATIC_REASON = "STATIC";
public static final String CACHED_REASON = "CACHED";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static dev.openfeature.contrib.providers.flagd.Config.MAX_CACHE_SIZE_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.PORT_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.SELECTOR_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.SERVER_CERT_PATH_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.SOCKET_PATH_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.TLS_ENV_VAR_NAME;
Expand Down Expand Up @@ -97,11 +98,18 @@ public class FlagdOptions {


/**
* Deadline to connect to flagD in milliseconds.
* Connection deadline in milliseconds.
* For RPC resolving, this is the deadline to connect to flagd for flag evaluation.
* For in-process resolving, this is the deadline for sync stream termination.
* */
@Builder.Default
private int deadline =
fallBackToEnvOrDefault(DEADLINE_MS_ENV_VAR_NAME, DEFAULT_DEADLINE);
private int deadline = fallBackToEnvOrDefault(DEADLINE_MS_ENV_VAR_NAME, DEFAULT_DEADLINE);

/**
* Selector to be used with flag sync gRPC contract.
**/
@Builder.Default
private String selector = fallBackToEnvOrDefault(SELECTOR_ENV_VAR_NAME, null);

/**
* Inject OpenTelemetry for the library runtime. Providing sdk will initiate distributed tracing for flagd grpc
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.model;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand All @@ -14,6 +15,7 @@
@Getter
@SuppressFBWarnings(value = {"EI_EXPOSE_REP"},
justification = "Feature flag comes as a Json configuration, hence they must be parsed and exposed")
@JsonIgnoreProperties(ignoreUnknown = true)
public class FeatureFlag {
public static final String EMPTY_TARGETING_STRING = "{}";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
Expand All @@ -18,6 +11,13 @@
import io.grpc.ManagedChannel;
import lombok.extern.java.Log;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;

/**
* Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract.
*/
Expand All @@ -38,16 +38,18 @@ public class GrpcStreamConnector implements Connector {
private final ManagedChannel channel;
private final FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub;
private final int deadline;
private final String selector;

/**
* Construct a new GrpcStreamConnector.
*
*
* @param options flagd options
*/
public GrpcStreamConnector(final FlagdOptions options) {
channel = ChannelBuilder.nettyChannel(options);
serviceStub = FlagSyncServiceGrpc.newStub(channel);
this.deadline = options.getDeadline();
deadline = options.getDeadline();
selector = options.getSelector();
}

/**
Expand All @@ -56,7 +58,13 @@ public GrpcStreamConnector(final FlagdOptions options) {
public void init() {
Thread listener = new Thread(() -> {
try {
observeEventStream(blockingQueue, shutdown, serviceStub);
final SyncService.SyncFlagsRequest.Builder requestBuilder = SyncService.SyncFlagsRequest.newBuilder();

if (selector != null) {
requestBuilder.setSelector(selector);
}

observeEventStream(blockingQueue, shutdown, serviceStub, requestBuilder.build());
} catch (InterruptedException e) {
log.log(Level.WARNING, "gRPC event stream interrupted, flag configurations are stale", e);
}
Expand All @@ -75,6 +83,7 @@ public BlockingQueue<StreamPayload> getStream() {

/**
* Shutdown gRPC stream connector.
*
* @throws InterruptedException if stream can't be closed within deadline.
*/
public void shutdown() throws InterruptedException {
Expand All @@ -101,14 +110,14 @@ public void shutdown() throws InterruptedException {
*/
static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
final AtomicBoolean shutdown,
final FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub)
final FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub,
final SyncService.SyncFlagsRequest request)
throws InterruptedException {

final BlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE);
int retryDelay = INIT_BACK_OFF;

while (!shutdown.get()) {
final SyncService.SyncFlagsRequest request = SyncService.SyncFlagsRequest.newBuilder().build();
serviceStub.syncFlags(request, new GrpcStreamHandler(streamReceiver));

while (!shutdown.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public void TestDefaults() {
assertEquals(builder.getCacheType(), DEFAULT_CACHE);
assertEquals(builder.getMaxCacheSize(), DEFAULT_MAX_CACHE_SIZE);
assertEquals(builder.getMaxEventStreamRetries(), DEFAULT_MAX_EVENT_STREAM_RETRIES);
assertNull(builder.getSelector());
assertNull(builder.getOpenTelemetry());
}

Expand All @@ -43,6 +44,7 @@ public void TestBuilderOptions(){
.cacheType("lru")
.maxCacheSize(100)
.maxEventStreamRetries(1)
.selector("app=weatherApp")
.openTelemetry(openTelemetry)
.build();

Expand All @@ -53,6 +55,7 @@ public void TestBuilderOptions(){
assertEquals(flagdOptions.getCacheType(), "lru");
assertEquals(flagdOptions.getMaxCacheSize(), 100);
assertEquals(flagdOptions.getMaxEventStreamRetries(), 1);
assertEquals(flagdOptions.getSelector(), "app=weatherApp");
assertEquals(flagdOptions.getOpenTelemetry(), openTelemetry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

public class TestUtils {
public static final String VALID_SIMPLE = "flagConfigurations/valid-simple.json";
public static final String VALID_SIMPLE_EXTRA_FIELD = "flagConfigurations/valid-simple-with-extra-fields.json";
public static final String VALID_LONG = "flagConfigurations/valid-long.json";
public static final String INVALID_FLAG = "flagConfigurations/invalid-flag.json";
public static final String INVALID_CFG = "flagConfigurations/invalid-configuration.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.INVALID_FLAG;
import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.VALID_LONG;
import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.VALID_SIMPLE;
import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.VALID_SIMPLE_EXTRA_FIELD;
import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.getFlagsFromResource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -30,6 +31,21 @@ public void validJsonConfigurationParsing() throws IOException {
assertEquals(false, variants.get("off"));
}

@Test
public void validJsonConfigurationWithExtraFieldsParsing() throws IOException {
Map<String, FeatureFlag> flagMap = FlagParser.parseString(getFlagsFromResource(VALID_SIMPLE_EXTRA_FIELD));
FeatureFlag boolFlag = flagMap.get("myBoolFlag");

assertNotNull(boolFlag);
assertEquals("ENABLED", boolFlag.getState());
assertEquals("on", boolFlag.getDefaultVariant());

Map<String, Object> variants = boolFlag.getVariants();

assertEquals(true, variants.get("on"));
assertEquals(false, variants.get("off"));
}

@Test
public void validJsonConfigurationWithTargetingRulesParsing() throws IOException {
Map<String, FeatureFlag> flagMap = FlagParser.parseString(getFlagsFromResource(VALID_LONG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,36 @@ class GrpcStreamConnectorTest {

private static final Duration MAX_WAIT_MS = Duration.ofMillis(500);

@Test
public void connectionParameters() throws Throwable {
// given
final FlagdOptions options = FlagdOptions.builder()
.selector("selector")
.build();

final GrpcStreamConnector connector = new GrpcStreamConnector(options);
final FlagSyncServiceGrpc.FlagSyncServiceStub stubMock = mockStubAndReturn(connector);

final SyncService.SyncFlagsRequest[] request = new SyncService.SyncFlagsRequest[1];

Mockito.doAnswer(invocation -> {
request[0] = invocation.getArgument(0, SyncService.SyncFlagsRequest.class);
return null;
}).when(stubMock).syncFlags(any(), any());

// when
connector.init();
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());

// then
final SyncService.SyncFlagsRequest flagsRequest = request[0];
assertNotNull(flagsRequest);
assertEquals("selector", flagsRequest.getSelector());
}

@Test
public void grpcConnectionStatus() throws Throwable {
// given
final GrpcStreamConnector connector = new GrpcStreamConnector(FlagdOptions.builder().build());
final FlagSyncServiceGrpc.FlagSyncServiceStub stubMock = mockStubAndReturn(connector);

Expand All @@ -38,11 +66,12 @@ public void grpcConnectionStatus() throws Throwable {
return null;
}).when(stubMock).syncFlags(any(), any());

// when
connector.init();

// verify and wait for initialization
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());

// then
final GrpcStreamHandler grpcStreamHandler = injectedHandler[0];
assertNotNull(grpcStreamHandler);

Expand All @@ -54,7 +83,7 @@ public void grpcConnectionStatus() throws Throwable {
.setState(SyncService.SyncState.SYNC_STATE_ALL)
.build());

assertTimeoutPreemptively(MAX_WAIT_MS, ()->{
assertTimeoutPreemptively(MAX_WAIT_MS, () -> {
StreamPayload payload = streamPayloads.take();
assertEquals(StreamPayloadType.DATA, payload.getType());
});
Expand All @@ -71,14 +100,15 @@ public void grpcConnectionStatus() throws Throwable {
.setState(SyncService.SyncState.SYNC_STATE_ALL)
.build());

assertTimeoutPreemptively(MAX_WAIT_MS, ()->{
assertTimeoutPreemptively(MAX_WAIT_MS, () -> {
StreamPayload payload = streamPayloads.take();
assertEquals(StreamPayloadType.DATA, payload.getType());
});
}

@Test
public void listenerExitOnShutdown() throws Throwable {
// given
final GrpcStreamConnector connector = new GrpcStreamConnector(FlagdOptions.builder().build());
final FlagSyncServiceGrpc.FlagSyncServiceStub stubMock = mockStubAndReturn(connector);

Expand All @@ -89,11 +119,12 @@ public void listenerExitOnShutdown() throws Throwable {
return null;
}).when(stubMock).syncFlags(any(), any());

// when
connector.init();

// verify and wait for initialization
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());

// then
final GrpcStreamHandler grpcStreamHandler = injectedHandler[0];
assertNotNull(grpcStreamHandler);

Expand All @@ -102,7 +133,7 @@ public void listenerExitOnShutdown() throws Throwable {
// mock channel close of gRPC handler
grpcStreamHandler.onError(new Exception("Channel closed, exiting"));

assertTimeoutPreemptively(MAX_WAIT_MS, ()->{
assertTimeoutPreemptively(MAX_WAIT_MS, () -> {
StreamPayload payload = connector.getStream().take();
assertEquals(StreamPayloadType.ERROR, payload.getType());
});
Expand All @@ -120,7 +151,8 @@ public void listenerExitOnShutdown() throws Throwable {
assertNull(connector.getStream().poll(100, TimeUnit.MILLISECONDS));
}

private static FlagSyncServiceGrpc.FlagSyncServiceStub mockStubAndReturn(final GrpcStreamConnector connector) throws Throwable {
private static FlagSyncServiceGrpc.FlagSyncServiceStub mockStubAndReturn(final GrpcStreamConnector connector)
throws Throwable {
final Field serviceStubField = GrpcStreamConnector.class.getDeclaredField("serviceStub");
serviceStubField.setAccessible(true);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"flags": {
"myBoolFlag": {
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "on",
"someNewFieldInFlagd": "value"
}
}
}