diff --git a/docs/changelog/89934.yaml b/docs/changelog/89934.yaml new file mode 100644 index 0000000000000..4084c38616cfa --- /dev/null +++ b/docs/changelog/89934.yaml @@ -0,0 +1,5 @@ +pr: 89934 +summary: Fix deadlock bug exposed by the test +area: Infra/Core +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java index 66ec590a1b773..96842c3e13553 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java @@ -59,6 +59,7 @@ public class FileSettingsService extends AbstractLifecycleComponent implements C private WatchService watchService; // null; private CountDownLatch watcherThreadLatch; + private volatile CountDownLatch processingLatch; private volatile FileUpdateState fileUpdateState = null; private volatile WatchKey settingsDirWatchKey = null; @@ -311,7 +312,15 @@ synchronized void startWatcher(ClusterState clusterState, boolean onStartup) { settingsDirWatchKey = enableSettingsWatcher(settingsDirWatchKey, settingsDir); if (watchedFileChanged(path)) { - processFileSettings(path, (e) -> logger.error("Error processing operator settings json file", e)).await(); + processingLatch = processFileSettings( + path, + (e) -> logger.error("Error processing operator settings json file", e) + ); + // After we get and set the processing latch, we need to check if stop wasn't + // invoked in the meantime. Stop will invalidate all watch keys. + if (configDirWatchKey != null) { + processingLatch.await(); + } } } catch (IOException e) { logger.warn("encountered I/O error while watching file settings", e); @@ -338,6 +347,9 @@ synchronized void stopWatcher() { cleanupWatchKeys(); fileUpdateState = null; watchService.close(); + if (processingLatch != null) { + processingLatch.countDown(); + } if (watcherThreadLatch != null) { watcherThreadLatch.await(); } diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java index 86d3bf76228a7..ff043b216e149 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java @@ -56,6 +56,7 @@ public class FileSettingsServiceTests extends ESTestCase { private Environment env; private ClusterService clusterService; private FileSettingsService fileSettingsService; + private ReservedClusterStateService controller; private ThreadPool threadpool; @Before @@ -86,10 +87,7 @@ public void setUp() throws Exception { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ReservedClusterStateService controller = new ReservedClusterStateService( - clusterService, - List.of(new ReservedClusterSettingsAction(clusterSettings)) - ); + controller = new ReservedClusterStateService(clusterService, List.of(new ReservedClusterSettingsAction(clusterSettings))); fileSettingsService = new FileSettingsService(clusterService, controller, env); } @@ -217,4 +215,93 @@ public void testInitialFile() throws Exception { service.stop(); service.close(); } + + @SuppressWarnings("unchecked") + public void testStopWorksInMiddleOfProcessing() throws Exception { + var spiedController = spy(controller); + var fsService = new FileSettingsService(clusterService, spiedController, env); + + FileSettingsService service = spy(fsService); + CountDownLatch processFileLatch = new CountDownLatch(1); + CountDownLatch deadThreadLatch = new CountDownLatch(1); + + doAnswer((Answer) invocation -> { + processFileLatch.countDown(); + new Thread(() -> { + // Simulate a thread that never comes back and decrements the + // countdown latch in FileSettingsService.processFileSettings + try { + deadThreadLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).start(); + return null; + }).when(spiedController).process(any(String.class), any(XContentParser.class), any(Consumer.class)); + + service.start(); + assertTrue(service.watching()); + + Files.createDirectories(service.operatorSettingsDir()); + + // Make some fake settings file to cause the file settings service to process it + Files.write(service.operatorSettingsFile(), "{}".getBytes(StandardCharsets.UTF_8)); + + // we need to wait a bit, on MacOS it may take up to 10 seconds for the Java watcher service to notice the file, + // on Linux is instantaneous. Windows is instantaneous too. + processFileLatch.await(30, TimeUnit.SECONDS); + + // Stopping the service should interrupt the watcher thread, we should be able to stop + service.stop(); + assertFalse(service.watching()); + service.close(); + // let the deadlocked thread end, so we can cleanly exit the test + deadThreadLatch.countDown(); + } + + @SuppressWarnings("unchecked") + public void testStopWorksIfProcessingDidntReturnYet() throws Exception { + var spiedController = spy(controller); + var fsService = new FileSettingsService(clusterService, spiedController, env); + + FileSettingsService service = spy(fsService); + CountDownLatch processFileLatch = new CountDownLatch(1); + CountDownLatch deadThreadLatch = new CountDownLatch(1); + + doAnswer((Answer) invocation -> { + processFileLatch.countDown(); + // allow the other thread to continue, but hold on a bit to avoid + // setting the count-down latch in the main watcher loop. + Thread.sleep(1_000); + new Thread(() -> { + // Simulate a thread that never comes back and decrements the + // countdown latch in FileSettingsService.processFileSettings + try { + deadThreadLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).start(); + return null; + }).when(spiedController).process(any(String.class), any(XContentParser.class), any(Consumer.class)); + + service.start(); + assertTrue(service.watching()); + + Files.createDirectories(service.operatorSettingsDir()); + + // Make some fake settings file to cause the file settings service to process it + Files.write(service.operatorSettingsFile(), "{}".getBytes(StandardCharsets.UTF_8)); + + // we need to wait a bit, on MacOS it may take up to 10 seconds for the Java watcher service to notice the file, + // on Linux is instantaneous. Windows is instantaneous too. + processFileLatch.await(30, TimeUnit.SECONDS); + + // Stopping the service should interrupt the watcher thread, we should be able to stop + service.stop(); + assertFalse(service.watching()); + service.close(); + // let the deadlocked thread end, so we can cleanly exit the test + deadThreadLatch.countDown(); + } }