diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnector.java index 0d808eb2d..3c65ca7d0 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnector.java @@ -9,12 +9,14 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** - * File connector reads flag configurations and expose the context through {@code Connector} contract. + * File connector reads flag configurations from a given file, polls for changes and expose the content through + * {@code Connector} contract. * The implementation is kept minimal and suites testing, local development needs. */ @SuppressFBWarnings(value = {"EI_EXPOSE_REP", "PATH_TRAVERSAL_IN"}, @@ -22,23 +24,58 @@ @Slf4j public class FileConnector implements Connector { + private static final int POLL_INTERVAL_MS = 5000; + private final String flagSourcePath; private final BlockingQueue queue = new LinkedBlockingQueue<>(1); + private boolean shutdown = false; public FileConnector(final String flagSourcePath) { this.flagSourcePath = flagSourcePath; } /** - * Initialize file connector. Reads content of the provided source file and offer it through queue. + * Initialize file connector. Reads file content, poll for changes and offer content through the queue. */ public void init() throws IOException { - final String flagData = new String(Files.readAllBytes(Paths.get(flagSourcePath)), StandardCharsets.UTF_8); + Thread watcherT = new Thread(() -> { + try { + final Path filePath = Paths.get(flagSourcePath); + + // initial read + String flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8); + if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) { + log.warn("Unable to offer file content to queue: queue is full"); + } + + long lastTS = Files.getLastModifiedTime(filePath).toMillis(); + + // start polling for changes + while (!shutdown) { + long currentTS = Files.getLastModifiedTime(filePath).toMillis(); + + if (currentTS > lastTS) { + lastTS = currentTS; + flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8); + if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) { + log.warn("Unable to offer file content to queue: queue is full"); + } + } + + Thread.sleep(POLL_INTERVAL_MS); + } - if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) { - throw new RuntimeException("Unable to write to queue. Queue is full."); - } + log.info("Shutting down file connector."); + } catch (Throwable t) { + log.error("Error from file connector. File connector will exit", t); + if (!queue.offer(new StreamPayload(StreamPayloadType.ERROR, t.toString()))) { + log.warn("Unable to offer file content to queue: queue is full"); + } + } + }); + watcherT.setDaemon(true); + watcherT.start(); log.info(String.format("Using feature flag configurations from file %s", flagSourcePath)); } @@ -50,9 +87,9 @@ public BlockingQueue getStream() { } /** - * NO-OP shutdown. + * Shutdown file connector. */ public void shutdown() throws InterruptedException { - // NO-OP nothing to do here + shutdown = true; } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/TestUtils.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/TestUtils.java index 61d6a038e..fec1e122a 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/TestUtils.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/TestUtils.java @@ -13,6 +13,7 @@ public class TestUtils { public static final String VALID_LONG = "flagConfigurations/valid-long.json"; public static final String INVALID_FLAG = "flagConfigurations/invalid-flag.json"; public static final String INVALID_CFG = "flagConfigurations/invalid-configuration.json"; + public static final String UPDATABLE_FILE = "flagConfigurations/updatableFlags.json"; public static String getFlagsFromResource(final String file) throws IOException { diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnectorTest.java index bea063de7..0f3b2f379 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnectorTest.java @@ -2,17 +2,22 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.time.Duration; import java.util.concurrent.BlockingQueue; +import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.UPDATABLE_FILE; import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.VALID_LONG; import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.getResourcePath; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; class FileConnectorTest { @@ -39,12 +44,61 @@ void readAndExposeFeatureFlagsFromSource() throws IOException { } @Test - void throwsErrorIfInvalidFile(){ + void emitErrorStateForInvalidPath() throws IOException { // given final FileConnector connector = new FileConnector("INVALID_PATH"); + // when + connector.init(); + + // then + final BlockingQueue stream = connector.getStream(); + + // Must emit an error within considerable time + final StreamPayload[] payload = new StreamPayload[1]; + assertTimeoutPreemptively(Duration.ofMillis(200), () -> { + payload[0] = stream.take(); + }); + + assertNotNull(payload[0].getData()); + assertEquals(StreamPayloadType.ERROR, payload[0].getType()); + } + + @Test + @Disabled("Disabled as unstable on GH Action. Useful for functionality validation") + void watchForFileUpdatesAndEmitThem() throws IOException { + final String initial = "{\"flags\":{\"myBoolFlag\":{\"state\":\"ENABLED\",\"variants\":{\"on\":true,\"off\":false},\"defaultVariant\":\"on\"}}}"; + final String updatedFlags = "{\"flags\":{\"myBoolFlag\":{\"state\":\"ENABLED\",\"variants\":{\"on\":true,\"off\":false},\"defaultVariant\":\"off\"}}}"; + + // given + final Path updPath = Paths.get(getResourcePath(UPDATABLE_FILE)); + Files.write(updPath, initial.getBytes(), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); + + final FileConnector connector = new FileConnector(updPath.toString()); + + // when + connector.init(); + // then - assertThrows(IOException.class, connector::init); + final BlockingQueue stream = connector.getStream(); + final StreamPayload[] payload = new StreamPayload[1]; + + // first validate the initial payload + assertTimeoutPreemptively(Duration.ofMillis(200), () -> { + payload[0] = stream.take(); + }); + + assertEquals(initial, payload[0].getData()); + + // then update the flags + Files.write(updPath, updatedFlags.getBytes(), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); + + // finally wait for updated payload + assertTimeoutPreemptively(Duration.ofSeconds(10), () -> { + payload[0] = stream.take(); + }); + + assertEquals(updatedFlags, payload[0].getData()); } } diff --git a/providers/flagd/src/test/resources/flagConfigurations/updatableFlags.json b/providers/flagd/src/test/resources/flagConfigurations/updatableFlags.json new file mode 100644 index 000000000..53681dc7d --- /dev/null +++ b/providers/flagd/src/test/resources/flagConfigurations/updatableFlags.json @@ -0,0 +1,12 @@ +{ + "flags": { + "myBoolFlag": { + "state": "ENABLED", + "variants": { + "on": true, + "off": false + }, + "defaultVariant": "on" + } + } +}