Skip to content

Commit 7ffe15f

Browse files
Add Missing NamedWritable Registration for ExecuteEnrichPolicyStatus (#62364) (#62375)
This was missing and caused nodes to drop out of the cluster on serialization failures when ever one tried to get an enrich policy task by name. The test in here is a little dirty but I figured it would be nice to have an actual reproducer for the issue and I couldn't find any infrastructure to nicely time the tasks so I put this on top of existing test infra.
1 parent db76fdc commit 7ffe15f

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
5252
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
5353
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
54+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
5455
import org.elasticsearch.xpack.core.eql.EqlFeatureSetUsage;
5556
import org.elasticsearch.xpack.core.flattened.FlattenedFeatureSetUsage;
5657
import org.elasticsearch.xpack.core.frozen.FrozenIndicesFeatureSetUsage;
@@ -634,7 +635,8 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
634635
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ANALYTICS, AnalyticsFeatureSetUsage::new),
635636
// Enrich
636637
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ENRICH, EnrichFeatureSet.Usage::new),
637-
// Searchable snapshots
638+
new NamedWriteableRegistry.Entry(Task.Status.class, ExecuteEnrichPolicyStatus.NAME, ExecuteEnrichPolicyStatus::new),
639+
// Searchable snapshots
638640
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SEARCHABLE_SNAPSHOTS,
639641
SearchableSnapshotFeatureSetUsage::new),
640642
// Data Streams

x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
package org.elasticsearch.xpack.enrich;
77

88
import org.apache.lucene.search.TotalHits;
9+
import org.elasticsearch.ResourceNotFoundException;
10+
import org.elasticsearch.action.ActionFuture;
11+
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
912
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1013
import org.elasticsearch.action.bulk.BulkItemResponse;
1114
import org.elasticsearch.action.bulk.BulkRequest;
@@ -23,6 +26,7 @@
2326
import org.elasticsearch.index.reindex.ReindexPlugin;
2427
import org.elasticsearch.ingest.common.IngestCommonPlugin;
2528
import org.elasticsearch.plugins.Plugin;
29+
import org.elasticsearch.tasks.TaskInfo;
2630
import org.elasticsearch.test.ESIntegTestCase;
2731
import org.elasticsearch.xpack.core.XPackSettings;
2832
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@@ -222,7 +226,27 @@ private static void createAndExecutePolicy() {
222226
);
223227
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
224228
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
225-
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(POLICY_NAME)).actionGet();
229+
final ActionFuture<ExecuteEnrichPolicyAction.Response> policyExecuteFuture = client().execute(
230+
ExecuteEnrichPolicyAction.INSTANCE,
231+
new ExecuteEnrichPolicyAction.Request(POLICY_NAME)
232+
);
233+
// Make sure we can deserialize enrich policy execution task status
234+
final List<TaskInfo> tasks = client().admin()
235+
.cluster()
236+
.prepareListTasks()
237+
.setActions(EnrichPolicyExecutor.TASK_ACTION)
238+
.get()
239+
.getTasks();
240+
// Best effort, sometimes the enrich policy task will not be visible yet or will have already finished
241+
if (tasks.isEmpty() == false) {
242+
try {
243+
final GetTaskResponse getTaskResponse = client().admin().cluster().prepareGetTask(tasks.get(0).getTaskId()).get();
244+
assertEquals(getTaskResponse.getTask().getTask().getAction(), EnrichPolicyExecutor.TASK_ACTION);
245+
} catch (ResourceNotFoundException e) {
246+
// ignored, could be the task has already finished
247+
}
248+
}
249+
policyExecuteFuture.actionGet();
226250
}
227251

228252
private static void createPipeline() {

0 commit comments

Comments
 (0)