From 67eca02ed8e1b3709a3a539bdb46602f89879cdf Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 23 Oct 2023 11:12:54 +0200 Subject: [PATCH 1/3] Add a latch to sync when all max upload tasks have been submitted Before we check the amount of active tasks on the prewarming executor, we need to verify that all the tasks have been actually submitted. Otherwise, we have a race in and amount of active tasks can be lower then the amount of submitted tasks. Fixes #99124 --- .../cache/full/SearchableSnapshotsPrewarmingIntegTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java index c0d413d09fc5b..d92403019e450 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java @@ -251,11 +251,13 @@ public void testConcurrentPrewarming() throws Exception { final CountDownLatch startPrewarmingLatch = new CountDownLatch(1); final var threadPool = getInstanceFromNode(ThreadPool.class); final int maxUploadTasks = threadPool.info(CACHE_PREWARMING_THREAD_POOL_NAME).getMax(); + final CountDownLatch maxUploadTasksCreated = new CountDownLatch(maxUploadTasks); for (int i = 0; i < maxUploadTasks; i++) { threadPool.executor(CACHE_PREWARMING_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override protected void doRun() throws Exception { + maxUploadTasksCreated.countDown(); startPrewarmingLatch.await(); } @@ -265,7 +267,7 @@ public void onFailure(Exception e) { } }); } - + maxUploadTasksCreated.await(); var prewarmingExecutor = threadPool.executor(CACHE_PREWARMING_THREAD_POOL_NAME); assertThat(prewarmingExecutor, instanceOf(ThreadPoolExecutor.class)); assertThat(((ThreadPoolExecutor) prewarmingExecutor).getActiveCount(), equalTo(maxUploadTasks)); @@ -279,7 +281,7 @@ public void onFailure(Exception e) { logger.debug("--> waiting for background cache to complete"); assertBusy(() -> { - if (threadPool.executor(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME) instanceof ThreadPoolExecutor executor) { + if (threadPool.executor(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME)instanceof ThreadPoolExecutor executor) { assertThat(executor.getQueue().size(), equalTo(0)); assertThat(executor.getActiveCount(), equalTo(0)); } From 3e956a3df0ce92e51387549eab59dc08f0477e06 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 23 Oct 2023 11:25:47 +0200 Subject: [PATCH 2/3] Update x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java Co-authored-by: David Turner --- .../cache/full/SearchableSnapshotsPrewarmingIntegTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java index d92403019e450..03063a4e90d6d 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java @@ -281,7 +281,7 @@ public void onFailure(Exception e) { logger.debug("--> waiting for background cache to complete"); assertBusy(() -> { - if (threadPool.executor(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME)instanceof ThreadPoolExecutor executor) { + if (threadPool.executor(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME) instanceof ThreadPoolExecutor executor) { assertThat(executor.getQueue().size(), equalTo(0)); assertThat(executor.getActiveCount(), equalTo(0)); } From 527b113640026efa77e8c9a72f20125144aa8a11 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 23 Oct 2023 11:25:53 +0200 Subject: [PATCH 3/3] Update x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java Co-authored-by: David Turner --- .../cache/full/SearchableSnapshotsPrewarmingIntegTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java index 03063a4e90d6d..50149cf8ca376 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java @@ -267,7 +267,7 @@ public void onFailure(Exception e) { } }); } - maxUploadTasksCreated.await(); + safeAwait(maxUploadTasksCreated); var prewarmingExecutor = threadPool.executor(CACHE_PREWARMING_THREAD_POOL_NAME); assertThat(prewarmingExecutor, instanceOf(ThreadPoolExecutor.class)); assertThat(((ThreadPoolExecutor) prewarmingExecutor).getActiveCount(), equalTo(maxUploadTasks));