|
6 | 6 | package org.elasticsearch.xpack.enrich;
|
7 | 7 |
|
8 | 8 | import org.apache.lucene.search.TotalHits;
|
| 9 | +import org.elasticsearch.ResourceNotFoundException; |
| 10 | +import org.elasticsearch.action.ActionFuture; |
| 11 | +import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse; |
9 | 12 | import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
10 | 13 | import org.elasticsearch.action.bulk.BulkItemResponse;
|
11 | 14 | import org.elasticsearch.action.bulk.BulkRequest;
|
|
23 | 26 | import org.elasticsearch.index.reindex.ReindexPlugin;
|
24 | 27 | import org.elasticsearch.ingest.common.IngestCommonPlugin;
|
25 | 28 | import org.elasticsearch.plugins.Plugin;
|
| 29 | +import org.elasticsearch.tasks.TaskInfo; |
26 | 30 | import org.elasticsearch.test.ESIntegTestCase;
|
27 | 31 | import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
28 | 32 | import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
|
@@ -207,7 +211,27 @@ private static void createAndExecutePolicy() {
|
207 | 211 | );
|
208 | 212 | PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
|
209 | 213 | client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
|
210 |
| - client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(POLICY_NAME)).actionGet(); |
| 214 | + final ActionFuture<ExecuteEnrichPolicyAction.Response> policyExecuteFuture = client().execute( |
| 215 | + ExecuteEnrichPolicyAction.INSTANCE, |
| 216 | + new ExecuteEnrichPolicyAction.Request(POLICY_NAME) |
| 217 | + ); |
| 218 | + // Make sure we can deserialize enrich policy execution task status |
| 219 | + final List<TaskInfo> tasks = client().admin() |
| 220 | + .cluster() |
| 221 | + .prepareListTasks() |
| 222 | + .setActions(EnrichPolicyExecutor.TASK_ACTION) |
| 223 | + .get() |
| 224 | + .getTasks(); |
| 225 | + // Best effort, sometimes the enrich policy task will not be visible yet or will have already finished |
| 226 | + if (tasks.isEmpty() == false) { |
| 227 | + try { |
| 228 | + final GetTaskResponse getTaskResponse = client().admin().cluster().prepareGetTask(tasks.get(0).getTaskId()).get(); |
| 229 | + assertEquals(getTaskResponse.getTask().getTask().getAction(), EnrichPolicyExecutor.TASK_ACTION); |
| 230 | + } catch (ResourceNotFoundException e) { |
| 231 | + // ignored, could be the task has already finished |
| 232 | + } |
| 233 | + } |
| 234 | + policyExecuteFuture.actionGet(); |
211 | 235 | }
|
212 | 236 |
|
213 | 237 | private static void createPipeline() {
|
|
0 commit comments