diff --git a/pom.xml b/pom.xml index 8b3ce85af..6fe155ef1 100644 --- a/pom.xml +++ b/pom.xml @@ -265,6 +265,8 @@ maven-surefire-plugin 3.5.2 + 1 + false ${surefireArgLine} diff --git a/src/main/java/dev/openfeature/sdk/EventProvider.java b/src/main/java/dev/openfeature/sdk/EventProvider.java index e9cdae55b..659c6ad46 100644 --- a/src/main/java/dev/openfeature/sdk/EventProvider.java +++ b/src/main/java/dev/openfeature/sdk/EventProvider.java @@ -1,6 +1,10 @@ package dev.openfeature.sdk; import dev.openfeature.sdk.internal.TriConsumer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; /** * Abstract EventProvider. Providers must extend this class to support events. @@ -14,8 +18,10 @@ * * @see FeatureProvider */ +@Slf4j public abstract class EventProvider implements FeatureProvider { private EventProviderListener eventProviderListener; + private final ExecutorService emitterExecutor = Executors.newCachedThreadPool(); void setEventProviderListener(EventProviderListener eventProviderListener) { this.eventProviderListener = eventProviderListener; @@ -46,6 +52,24 @@ void detach() { this.onEmit = null; } + /** + * Stop the event emitter executor and block until either termination has completed + * or timeout period has elapsed. + */ + @Override + public void shutdown() { + emitterExecutor.shutdown(); + try { + if (!emitterExecutor.awaitTermination(EventSupport.SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + log.warn("Emitter executor did not terminate before the timeout period had elapsed"); + emitterExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + emitterExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + /** * Emit the specified {@link ProviderEvent}. * @@ -56,8 +80,10 @@ public void emit(ProviderEvent event, ProviderEventDetails details) { if (eventProviderListener != null) { eventProviderListener.onEmit(event, details); } - if (this.onEmit != null) { - this.onEmit.accept(this, event, details); + + final TriConsumer localOnEmit = this.onEmit; + if (localOnEmit != null) { + emitterExecutor.submit(() -> localOnEmit.accept(this, event, details)); } } diff --git a/src/main/java/dev/openfeature/sdk/EventSupport.java b/src/main/java/dev/openfeature/sdk/EventSupport.java index d3af45991..5ebe90a4c 100644 --- a/src/main/java/dev/openfeature/sdk/EventSupport.java +++ b/src/main/java/dev/openfeature/sdk/EventSupport.java @@ -19,15 +19,15 @@ @Slf4j class EventSupport { + public static final int SHUTDOWN_TIMEOUT_SECONDS = 3; + // we use a v4 uuid as a "placeholder" for anonymous clients, since // ConcurrentHashMap doesn't support nulls private static final String defaultClientUuid = UUID.randomUUID().toString(); - private static final int SHUTDOWN_TIMEOUT_SECONDS = 3; private final Map handlerStores = new ConcurrentHashMap<>(); private final HandlerStore globalHandlerStore = new HandlerStore(); private final ExecutorService taskExecutor = Executors.newCachedThreadPool(runnable -> { final Thread thread = new Thread(runnable); - thread.setDaemon(true); return thread; }); diff --git a/src/test/java/dev/openfeature/sdk/EventProviderTest.java b/src/test/java/dev/openfeature/sdk/EventProviderTest.java index d8af6e8d3..a159877f0 100644 --- a/src/test/java/dev/openfeature/sdk/EventProviderTest.java +++ b/src/test/java/dev/openfeature/sdk/EventProviderTest.java @@ -5,13 +5,18 @@ import static org.mockito.Mockito.*; import dev.openfeature.sdk.internal.TriConsumer; +import dev.openfeature.sdk.testutils.TestStackedEmitCallsProvider; +import io.cucumber.java.AfterAll; import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; class EventProviderTest { + private static final int TIMEOUT = 300; + private TestEventProvider eventProvider; @BeforeEach @@ -21,6 +26,11 @@ void setup() { eventProvider.initialize(null); } + @AfterAll + public static void resetDefaultProvider() { + OpenFeatureAPI.getInstance().setProviderAndWait(new NoOpProvider()); + } + @Test @DisplayName("should run attached onEmit with emitters") void emitsEventsWhenAttached() { @@ -34,10 +44,10 @@ void emitsEventsWhenAttached() { eventProvider.emitProviderStale(details); eventProvider.emitProviderError(details); - verify(onEmit, times(2)).accept(eventProvider, ProviderEvent.PROVIDER_READY, details); - verify(onEmit, times(1)).accept(eventProvider, ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details); - verify(onEmit, times(1)).accept(eventProvider, ProviderEvent.PROVIDER_STALE, details); - verify(onEmit, times(1)).accept(eventProvider, ProviderEvent.PROVIDER_ERROR, details); + verify(onEmit, timeout(TIMEOUT).times(2)).accept(eventProvider, ProviderEvent.PROVIDER_READY, details); + verify(onEmit, timeout(TIMEOUT)).accept(eventProvider, ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details); + verify(onEmit, timeout(TIMEOUT)).accept(eventProvider, ProviderEvent.PROVIDER_STALE, details); + verify(onEmit, timeout(TIMEOUT)).accept(eventProvider, ProviderEvent.PROVIDER_ERROR, details); } @Test @@ -75,6 +85,15 @@ void doesNotThrowWhenOnEmitSame() { eventProvider.attach(onEmit2); // should not throw, same instance. noop } + @Test + @SneakyThrows + @Timeout(value = 2, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) + @DisplayName("should not deadlock on emit called during emit") + void doesNotDeadlockOnEmitStackedCalls() { + TestStackedEmitCallsProvider provider = new TestStackedEmitCallsProvider(); + OpenFeatureAPI.getInstance().setProviderAndWait(provider); + } + static class TestEventProvider extends EventProvider { private static final String NAME = "TestEventProvider"; diff --git a/src/test/java/dev/openfeature/sdk/EventsTest.java b/src/test/java/dev/openfeature/sdk/EventsTest.java index 02a5953b9..e59024652 100644 --- a/src/test/java/dev/openfeature/sdk/EventsTest.java +++ b/src/test/java/dev/openfeature/sdk/EventsTest.java @@ -19,7 +19,7 @@ class EventsTest { - private static final int TIMEOUT = 300; + private static final int TIMEOUT = 500; private static final int INIT_DELAY = TIMEOUT / 2; @AfterAll @@ -601,13 +601,13 @@ void matchingStaleEventsMustRunImmediately() { OpenFeatureAPI api = OpenFeatureAPI.getInstance(); // provider which is already stale - TestEventsProvider provider = TestEventsProvider.newInitializedTestEventsProvider(); + TestEventsProvider provider = new TestEventsProvider(INIT_DELAY); Client client = api.getClient(name); api.setProviderAndWait(name, provider); provider.emitProviderStale(ProviderEventDetails.builder().build()); assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE); - // should run even thought handler was added after stale + // should run even though handler was added after stale client.onProviderStale(handler); verify(handler, timeout(TIMEOUT)).accept(any()); } @@ -623,13 +623,13 @@ void matchingErrorEventsMustRunImmediately() { OpenFeatureAPI api = OpenFeatureAPI.getInstance(); // provider which is already in error - TestEventsProvider provider = new TestEventsProvider(); + TestEventsProvider provider = new TestEventsProvider(INIT_DELAY); Client client = api.getClient(name); api.setProviderAndWait(name, provider); provider.emitProviderError(ProviderEventDetails.builder().build()); assertThat(client.getProviderState()).isEqualTo(ProviderState.ERROR); - // should run even thought handler was added after error + // should run even though handler was added after error client.onProviderError(handler); verify(handler, timeout(TIMEOUT)).accept(any()); } diff --git a/src/test/java/dev/openfeature/sdk/testutils/TestStackedEmitCallsProvider.java b/src/test/java/dev/openfeature/sdk/testutils/TestStackedEmitCallsProvider.java new file mode 100644 index 000000000..d1bf65c57 --- /dev/null +++ b/src/test/java/dev/openfeature/sdk/testutils/TestStackedEmitCallsProvider.java @@ -0,0 +1,103 @@ +package dev.openfeature.sdk.testutils; + +import dev.openfeature.sdk.EvaluationContext; +import dev.openfeature.sdk.EventProvider; +import dev.openfeature.sdk.Metadata; +import dev.openfeature.sdk.ProviderEvaluation; +import dev.openfeature.sdk.ProviderEvent; +import dev.openfeature.sdk.ProviderEventDetails; +import dev.openfeature.sdk.Value; +import java.util.function.Consumer; + +public class TestStackedEmitCallsProvider extends EventProvider { + private final NestedBlockingEmitter nestedBlockingEmitter = new NestedBlockingEmitter(this::onProviderEvent); + + @Override + public Metadata getMetadata() { + return () -> getClass().getSimpleName(); + } + + @Override + public void initialize(EvaluationContext evaluationContext) throws Exception { + synchronized (nestedBlockingEmitter) { + nestedBlockingEmitter.init(); + while (!nestedBlockingEmitter.isReady()) { + try { + nestedBlockingEmitter.wait(); + } catch (InterruptedException e) { + } + } + } + } + + private void onProviderEvent(ProviderEvent providerEvent) { + synchronized (nestedBlockingEmitter) { + if (providerEvent == ProviderEvent.PROVIDER_READY) { + nestedBlockingEmitter.setReady(); + /* + * This line deadlocked in the original implementation without the emitterExecutor see + * https://github.com/open-feature/java-sdk/issues/1299 + */ + emitProviderReady(ProviderEventDetails.builder().build()); + } + } + } + + @Override + public ProviderEvaluation getBooleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx) { + throw new UnsupportedOperationException("Unimplemented method 'getBooleanEvaluation'"); + } + + @Override + public ProviderEvaluation getStringEvaluation(String key, String defaultValue, EvaluationContext ctx) { + throw new UnsupportedOperationException("Unimplemented method 'getStringEvaluation'"); + } + + @Override + public ProviderEvaluation getIntegerEvaluation(String key, Integer defaultValue, EvaluationContext ctx) { + throw new UnsupportedOperationException("Unimplemented method 'getIntegerEvaluation'"); + } + + @Override + public ProviderEvaluation getDoubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) { + throw new UnsupportedOperationException("Unimplemented method 'getDoubleEvaluation'"); + } + + @Override + public ProviderEvaluation getObjectEvaluation(String key, Value defaultValue, EvaluationContext ctx) { + throw new UnsupportedOperationException("Unimplemented method 'getObjectEvaluation'"); + } + + static class NestedBlockingEmitter { + + private final Consumer emitProviderEvent; + private volatile boolean isReady; + + public NestedBlockingEmitter(Consumer emitProviderEvent) { + this.emitProviderEvent = emitProviderEvent; + } + + public void init() { + // run init outside monitored thread + new Thread(() -> { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + emitProviderEvent.accept(ProviderEvent.PROVIDER_READY); + }) + .start(); + } + + public boolean isReady() { + return isReady; + } + + public synchronized void setReady() { + isReady = true; + this.notifyAll(); + } + } +}