Skip to content

Commit fa6c5b4

Browse files
Add Missing NamedWritable Registration for ExecuteEnrichPolicyStatus (elastic#62364)
This was missing and caused nodes to drop out of the cluster on serialization failures when ever one tried to get an enrich policy task by name. The test in here is a little dirty but I figured it would be nice to have an actual reproducer for the issue and I couldn't find any infrastructure to nicely time the tasks so I put this on top of existing test infra.
1 parent 4eea602 commit fa6c5b4

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
5252
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
5353
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
54+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
5455
import org.elasticsearch.xpack.core.eql.EqlFeatureSetUsage;
5556
import org.elasticsearch.xpack.core.flattened.FlattenedFeatureSetUsage;
5657
import org.elasticsearch.xpack.core.frozen.FrozenIndicesFeatureSetUsage;
@@ -641,7 +642,8 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
641642
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ANALYTICS, AnalyticsFeatureSetUsage::new),
642643
// Enrich
643644
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ENRICH, EnrichFeatureSet.Usage::new),
644-
// Searchable snapshots
645+
new NamedWriteableRegistry.Entry(Task.Status.class, ExecuteEnrichPolicyStatus.NAME, ExecuteEnrichPolicyStatus::new),
646+
// Searchable snapshots
645647
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SEARCHABLE_SNAPSHOTS,
646648
SearchableSnapshotFeatureSetUsage::new),
647649
// Data Streams

x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
package org.elasticsearch.xpack.enrich;
77

88
import org.apache.lucene.search.TotalHits;
9+
import org.elasticsearch.ResourceNotFoundException;
10+
import org.elasticsearch.action.ActionFuture;
11+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
912
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1013
import org.elasticsearch.action.bulk.BulkItemResponse;
1114
import org.elasticsearch.action.bulk.BulkRequest;
@@ -23,6 +26,7 @@
2326
import org.elasticsearch.index.reindex.ReindexPlugin;
2427
import org.elasticsearch.ingest.common.IngestCommonPlugin;
2528
import org.elasticsearch.plugins.Plugin;
29+
import org.elasticsearch.tasks.TaskInfo;
2630
import org.elasticsearch.test.ESIntegTestCase;
2731
import org.elasticsearch.xpack.core.XPackSettings;
2832
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@@ -222,7 +226,27 @@ private static void createAndExecutePolicy() {
222226
);
223227
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
224228
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
225-
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(POLICY_NAME)).actionGet();
229+
final ActionFuture<ExecuteEnrichPolicyAction.Response> policyExecuteFuture = client().execute(
230+
ExecuteEnrichPolicyAction.INSTANCE,
231+
new ExecuteEnrichPolicyAction.Request(POLICY_NAME)
232+
);
233+
// Make sure we can deserialize enrich policy execution task status
234+
final List<TaskInfo> tasks = client().admin()
235+
.cluster()
236+
.prepareListTasks()
237+
.setActions(EnrichPolicyExecutor.TASK_ACTION)
238+
.get()
239+
.getTasks();
240+
// Best effort, sometimes the enrich policy task will not be visible yet or will have already finished
241+
if (tasks.isEmpty() == false) {
242+
try {
243+
final GetTaskResponse getTaskResponse = client().admin().cluster().prepareGetTask(tasks.get(0).getTaskId()).get();
244+
assertEquals(getTaskResponse.getTask().getTask().getAction(), EnrichPolicyExecutor.TASK_ACTION);
245+
} catch (ResourceNotFoundException e) {
246+
// ignored, could be the task has already finished
247+
}
248+
}
249+
policyExecuteFuture.actionGet();
226250
}
227251

228252
private static void createPipeline() {

0 commit comments

Comments
 (0)