diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 2e8f15e0886c0..83c1dcc6464b7 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -1004,7 +1004,6 @@ public void testILMRolloverOnManuallyRolledIndex() throws Exception { assertBusy(() -> assertTrue(indexExists(thirdIndex))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/50353") public void testHistoryIsWrittenWithSuccess() throws Exception { String index = "index"; @@ -1047,7 +1046,6 @@ public void testHistoryIsWrittenWithSuccess() throws Exception { assertBusy(() -> assertHistoryIsPresent(policy, index + "-000002", true, "check-rollover-ready"), 30, TimeUnit.SECONDS); } - @AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/50353") public void testHistoryIsWrittenWithFailure() throws Exception { String index = "index"; diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 4051b291b9801..3e1ad336905eb 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -179,7 +179,8 @@ public Collection createComponents(Client client, ClusterService cluster @SuppressWarnings("unused") ILMHistoryTemplateRegistry ilmTemplateRegistry = new ILMHistoryTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry); - ilmHistoryStore.set(new ILMHistoryStore(settings, new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN), clusterService)); + ilmHistoryStore.set(new ILMHistoryStore(settings, new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN), + clusterService, threadPool)); indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, getClock(), System::currentTimeMillis, xContentRegistry, ilmHistoryStore.get())); components.add(indexLifecycleInitialisationService.get()); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java index ab8168de4b28d..96c54e5adfca3 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.Alias; @@ -31,11 +32,13 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -54,26 +57,52 @@ public class ILMHistoryStore implements Closeable { public static final String ILM_HISTORY_INDEX_PREFIX = "ilm-history-" + INDEX_TEMPLATE_VERSION + "-"; public static final String ILM_HISTORY_ALIAS = "ilm-history-" + INDEX_TEMPLATE_VERSION; - private final Client client; - private final ClusterService clusterService; private final boolean ilmHistoryEnabled; private final BulkProcessor processor; + private final ThreadPool threadPool; - public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService) { - this.client = client; - this.clusterService = clusterService; - ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService, ThreadPool threadPool) { + this.ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); + this.threadPool = threadPool; this.processor = BulkProcessor.builder( new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN)::bulk, new BulkProcessor.Listener() { @Override - public void beforeBulk(long executionId, BulkRequest request) { } + public void beforeBulk(long executionId, BulkRequest request) { + // Prior to actually performing the bulk, we should ensure the index exists, and + // if we were unable to create it or it was in a bad state, we should not + // attempt to index documents. + try { + final CompletableFuture indexCreated = new CompletableFuture<>(); + ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(indexCreated::complete, + ex -> { + logger.warn("failed to create ILM history store index prior to issuing bulk request", ex); + indexCreated.completeExceptionally(ex); + })); + indexCreated.get(2, TimeUnit.MINUTES); + } catch (Exception e) { + logger.warn(new ParameterizedMessage("unable to index the following ILM history items:\n{}", + request.requests().stream() + .filter(dwr -> (dwr instanceof IndexRequest)) + .map(dwr -> ((IndexRequest) dwr)) + .map(IndexRequest::sourceAsMap) + .map(Object::toString) + .collect(Collectors.joining("\n"))), e); + throw new ElasticsearchException(e); + } + } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { long items = request.numberOfActions(); - logger.trace("indexed [{}] items into ILM history index", items); + if (logger.isTraceEnabled()) { + logger.trace("indexed [{}] items into ILM history index [{}]", items, + Arrays.stream(response.getItems()) + .map(BulkItemResponse::getIndex) + .distinct() + .collect(Collectors.joining(","))); + } if (response.hasFailures()) { Map failures = Arrays.stream(response.getItems()) .filter(BulkItemResponse::isFailed) @@ -105,18 +134,25 @@ public void putAsync(ILMHistoryItem item) { LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), item); return; } - logger.trace("about to index ILM history item in index [{}]: [{}]", ILM_HISTORY_ALIAS, item); - ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(createdIndex -> { - try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - item.toXContent(builder, ToXContent.EMPTY_PARAMS); - IndexRequest request = new IndexRequest(ILM_HISTORY_ALIAS).source(builder); - processor.add(request); - } catch (IOException exception) { - logger.error(new ParameterizedMessage("failed to index ILM history item in index [{}]: [{}]", - ILM_HISTORY_ALIAS, item), exception); - } - }, ex -> logger.error(new ParameterizedMessage("failed to ensure ILM history index exists, not indexing history item [{}]", - item), ex))); + logger.trace("queueing ILM history item for indexing [{}]: [{}]", ILM_HISTORY_ALIAS, item); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + item.toXContent(builder, ToXContent.EMPTY_PARAMS); + IndexRequest request = new IndexRequest(ILM_HISTORY_ALIAS).source(builder); + // TODO: remove the threadpool wrapping when the .add call is non-blocking + // (it can currently execute the bulk request occasionally) + // see: https://github.com/elastic/elasticsearch/issues/50440 + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + processor.add(request); + } catch (Exception e) { + logger.error(new ParameterizedMessage("failed add ILM history item to queue for index [{}]: [{}]", + ILM_HISTORY_ALIAS, item), e); + } + }); + } catch (IOException exception) { + logger.error(new ParameterizedMessage("failed to queue ILM history item in index [{}]: [{}]", + ILM_HISTORY_ALIAS, item), exception); + } } /** @@ -134,6 +170,7 @@ static void ensureHistoryIndex(Client client, ClusterState state, ActionListener if (ilmHistory == null && initialHistoryIndex == null) { // No alias or index exists with the expected names, so create the index with appropriate alias + logger.debug("creating ILM history index [{}]", initialHistoryIndexName); client.admin().indices().prepareCreate(initialHistoryIndexName) .setWaitForActiveShards(1) .addAlias(new Alias(ILM_HISTORY_ALIAS) diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index 130a77cf853dd..faa1270f479aa 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -1118,7 +1118,7 @@ private class NoOpHistoryStore extends ILMHistoryStore { private final List items = new ArrayList<>(); NoOpHistoryStore() { - super(Settings.EMPTY, noopClient, null); + super(Settings.EMPTY, noopClient, null, null); } public List getItems() { diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java index 3e6b9a638737c..47a1ba406e502 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java @@ -68,7 +68,7 @@ public void setup() { threadPool = new TestThreadPool(this.getClass().getName()); client = new VerifyingClient(threadPool); clusterService = ClusterServiceUtils.createClusterService(threadPool); - historyStore = new ILMHistoryStore(Settings.EMPTY, client, clusterService); + historyStore = new ILMHistoryStore(Settings.EMPTY, client, clusterService, threadPool); } @After @@ -81,7 +81,7 @@ public void setdown() { public void testNoActionIfDisabled() throws Exception { Settings settings = Settings.builder().put(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), false).build(); - try (ILMHistoryStore disabledHistoryStore = new ILMHistoryStore(settings, client, null)) { + try (ILMHistoryStore disabledHistoryStore = new ILMHistoryStore(settings, client, null, threadPool)) { String policyId = randomAlphaOfLength(5); final long timestamp = randomNonNegativeLong(); ILMHistoryItem record = ILMHistoryItem.success("index", policyId, timestamp, null, null); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java index 7c7a555a8d5d9..16f388fa49481 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyItem; import org.elasticsearch.xpack.core.slm.SnapshotRetentionConfiguration; @@ -64,6 +65,13 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { static final String REPO = "my-repo"; List dataNodeNames = null; + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false) + .build(); + } + @Before public void ensureClusterNodes() { logger.info("--> starting enough nodes to ensure we have enough to safely stop for tests");