Skip to content

Commit e035dab

Browse files
committed
Extend tracking of parent tasks to master node, replication and broadcast actions
Now MasterNodeOperations, ReplicationAllShards, ReplicationSingleShard, BroadcastReplication and BroadcastByNode actions keep track of their parent tasks.
1 parent d4c40fc commit e035dab

34 files changed

+763
-156
lines changed

core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.common.settings.Settings;
4242
import org.elasticsearch.index.shard.IndexShard;
4343
import org.elasticsearch.indices.IndicesService;
44+
import org.elasticsearch.tasks.Task;
4445
import org.elasticsearch.threadpool.ThreadPool;
4546
import org.elasticsearch.transport.TransportService;
4647

@@ -179,7 +180,7 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, UpgradeReq
179180
}
180181

181182
@Override
182-
protected void doExecute(UpgradeRequest request, final ActionListener<UpgradeResponse> listener) {
183+
protected void doExecute(Task task, UpgradeRequest request, final ActionListener<UpgradeResponse> listener) {
183184
ActionListener<UpgradeResponse> settingsUpdateListener = new ActionListener<UpgradeResponse>() {
184185
@Override
185186
public void onResponse(UpgradeResponse upgradeResponse) {
@@ -199,7 +200,7 @@ public void onFailure(Throwable e) {
199200
listener.onFailure(e);
200201
}
201202
};
202-
super.doExecute(request, settingsUpdateListener);
203+
super.doExecute(task, request, settingsUpdateListener);
203204
}
204205

205206
private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {

core/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.search.internal.DefaultSearchContext;
5353
import org.elasticsearch.search.internal.SearchContext;
5454
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
55+
import org.elasticsearch.tasks.Task;
5556
import org.elasticsearch.threadpool.ThreadPool;
5657
import org.elasticsearch.transport.TransportService;
5758

@@ -89,9 +90,9 @@ public TransportValidateQueryAction(Settings settings, ThreadPool threadPool, Cl
8990
}
9091

9192
@Override
92-
protected void doExecute(ValidateQueryRequest request, ActionListener<ValidateQueryResponse> listener) {
93+
protected void doExecute(Task task, ValidateQueryRequest request, ActionListener<ValidateQueryResponse> listener) {
9394
request.nowInMillis = System.currentTimeMillis();
94-
super.doExecute(request, listener);
95+
super.doExecute(task, request, listener);
9596
}
9697

9798
@Override

core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.index.shard.ShardId;
4545
import org.elasticsearch.indices.IndexAlreadyExistsException;
4646
import org.elasticsearch.indices.IndicesService;
47+
import org.elasticsearch.tasks.Task;
4748
import org.elasticsearch.threadpool.ThreadPool;
4849
import org.elasticsearch.transport.TransportService;
4950

@@ -69,27 +70,27 @@ public TransportDeleteAction(Settings settings, TransportService transportServic
6970
}
7071

7172
@Override
72-
protected void doExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
73+
protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
7374
ClusterState state = clusterService.state();
7475
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
75-
createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
76+
createIndexAction.execute(task, new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
7677
@Override
7778
public void onResponse(CreateIndexResponse result) {
78-
innerExecute(request, listener);
79+
innerExecute(task, request, listener);
7980
}
8081

8182
@Override
8283
public void onFailure(Throwable e) {
8384
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
8485
// we have the index, do it
85-
innerExecute(request, listener);
86+
innerExecute(task, request, listener);
8687
} else {
8788
listener.onFailure(e);
8889
}
8990
}
9091
});
9192
} else {
92-
innerExecute(request, listener);
93+
innerExecute(task, request, listener);
9394
}
9495
}
9596

@@ -114,8 +115,8 @@ protected void resolveRequest(final MetaData metaData, String concreteIndex, Del
114115
request.setShardId(shardId);
115116
}
116117

117-
private void innerExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
118-
super.doExecute(request, listener);
118+
private void innerExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
119+
super.doExecute(task, request, listener);
119120
}
120121

121122
@Override

core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.index.translog.Translog;
4949
import org.elasticsearch.indices.IndexAlreadyExistsException;
5050
import org.elasticsearch.indices.IndicesService;
51+
import org.elasticsearch.tasks.Task;
5152
import org.elasticsearch.threadpool.ThreadPool;
5253
import org.elasticsearch.transport.TransportService;
5354

@@ -84,7 +85,7 @@ public TransportIndexAction(Settings settings, TransportService transportService
8485
}
8586

8687
@Override
87-
protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
88+
protected void doExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
8889
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
8990
ClusterState state = clusterService.state();
9091
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
@@ -93,18 +94,18 @@ protected void doExecute(final IndexRequest request, final ActionListener<IndexR
9394
createIndexRequest.mapping(request.type());
9495
createIndexRequest.cause("auto(index api)");
9596
createIndexRequest.masterNodeTimeout(request.timeout());
96-
createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
97+
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
9798
@Override
9899
public void onResponse(CreateIndexResponse result) {
99-
innerExecute(request, listener);
100+
innerExecute(task, request, listener);
100101
}
101102

102103
@Override
103104
public void onFailure(Throwable e) {
104105
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
105106
// we have the index, do it
106107
try {
107-
innerExecute(request, listener);
108+
innerExecute(task, request, listener);
108109
} catch (Throwable e1) {
109110
listener.onFailure(e1);
110111
}
@@ -114,7 +115,7 @@ public void onFailure(Throwable e) {
114115
}
115116
});
116117
} else {
117-
innerExecute(request, listener);
118+
innerExecute(task, request, listener);
118119
}
119120
}
120121

@@ -129,8 +130,8 @@ protected void resolveRequest(MetaData metaData, String concreteIndex, IndexRequ
129130
request.setShardId(shardId);
130131
}
131132

132-
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
133-
super.doExecute(request, listener);
133+
private void innerExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
134+
super.doExecute(task, request, listener);
134135
}
135136

136137
@Override

core/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.index.engine.DocumentMissingException;
4242
import org.elasticsearch.percolator.PercolateException;
4343
import org.elasticsearch.percolator.PercolatorService;
44+
import org.elasticsearch.tasks.Task;
4445
import org.elasticsearch.threadpool.ThreadPool;
4546
import org.elasticsearch.transport.TransportService;
4647

@@ -70,7 +71,7 @@ public TransportPercolateAction(Settings settings, ThreadPool threadPool, Cluste
7071
}
7172

7273
@Override
73-
protected void doExecute(final PercolateRequest request, final ActionListener<PercolateResponse> listener) {
74+
protected void doExecute(Task task, final PercolateRequest request, final ActionListener<PercolateResponse> listener) {
7475
request.startTime = System.currentTimeMillis();
7576
if (request.getRequest() != null) {
7677
//create a new get request to make sure it has the same headers and context as the original percolate request
@@ -84,7 +85,7 @@ public void onResponse(GetResponse getResponse) {
8485
}
8586

8687
BytesReference docSource = getResponse.getSourceAsBytesRef();
87-
TransportPercolateAction.super.doExecute(new PercolateRequest(request, docSource), listener);
88+
TransportPercolateAction.super.doExecute(task, new PercolateRequest(request, docSource), listener);
8889
}
8990

9091
@Override
@@ -93,7 +94,7 @@ public void onFailure(Throwable e) {
9394
}
9495
});
9596
} else {
96-
super.doExecute(request, listener);
97+
super.doExecute(task, request, listener);
9798
}
9899
}
99100

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.support;
21+
22+
import org.elasticsearch.action.ActionRequest;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.tasks.Task;
26+
27+
import java.io.IOException;
28+
29+
/**
30+
* Base class for action requests that can have associated child tasks
31+
*/
32+
public abstract class ChildTaskActionRequest<Request extends ActionRequest<Request>> extends ActionRequest<Request> {
33+
34+
private String parentTaskNode;
35+
36+
private long parentTaskId;
37+
38+
protected ChildTaskActionRequest() {
39+
40+
}
41+
42+
public void setParentTask(String parentTaskNode, long parentTaskId) {
43+
this.parentTaskNode = parentTaskNode;
44+
this.parentTaskId = parentTaskId;
45+
}
46+
47+
@Override
48+
public void readFrom(StreamInput in) throws IOException {
49+
super.readFrom(in);
50+
parentTaskNode = in.readOptionalString();
51+
parentTaskId = in.readLong();
52+
}
53+
54+
@Override
55+
public void writeTo(StreamOutput out) throws IOException {
56+
super.writeTo(out);
57+
out.writeOptionalString(parentTaskNode);
58+
out.writeLong(parentTaskId);
59+
}
60+
61+
@Override
62+
public Task createTask(long id, String type, String action) {
63+
return new Task(id, type, action, this::getDescription, parentTaskNode, parentTaskId);
64+
}
65+
66+
}

core/src/main/java/org/elasticsearch/action/support/ChildTaskRequest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919

2020
package org.elasticsearch.action.support;
2121

22-
import org.elasticsearch.Version;
2322
import org.elasticsearch.common.io.stream.StreamInput;
2423
import org.elasticsearch.common.io.stream.StreamOutput;
25-
import org.elasticsearch.tasks.ChildTask;
2624
import org.elasticsearch.tasks.Task;
2725
import org.elasticsearch.transport.TransportRequest;
2826

@@ -61,6 +59,6 @@ public void writeTo(StreamOutput out) throws IOException {
6159

6260
@Override
6361
public Task createTask(long id, String type, String action) {
64-
return new ChildTask(id, type, action, this::getDescription, parentTaskNode, parentTaskId);
62+
return new Task(id, type, action, this::getDescription, parentTaskNode, parentTaskId);
6563
}
6664
}

core/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@ protected HandledTransportAction(Settings settings, String actionName, ThreadPoo
4444
class TransportHandler implements TransportRequestHandler<Request> {
4545

4646
@Override
47-
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
48-
messageReceived(request, channel);
47+
public final void messageReceived(Request request, TransportChannel channel) throws Exception {
48+
throw new UnsupportedOperationException("the task parameter is required for this operation");
4949
}
5050

5151
@Override
52-
public final void messageReceived(Request request, TransportChannel channel) throws Exception {
53-
execute(request, new ActionListener<Response>() {
52+
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
53+
// We already got the task created on the netty layer - no need to create it again on the transport layer
54+
execute(task, request, new ActionListener<Response>() {
5455
@Override
5556
public void onResponse(Response response) {
5657
try {

core/src/main/java/org/elasticsearch/action/support/TransportAction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public final ActionFuture<Response> execute(Request request) {
6666
return future;
6767
}
6868

69+
/**
70+
* Use this method when the transport action call should result in creation of a new task associated with the call.
71+
*
72+
* This is a typical behavior.
73+
*/
6974
public final Task execute(Request request, ActionListener<Response> listener) {
7075
Task task = taskManager.register("transport", actionName, request);
7176
if (task == null) {
@@ -88,7 +93,10 @@ public void onFailure(Throwable e) {
8893
return task;
8994
}
9095

91-
private final void execute(Task task, Request request, ActionListener<Response> listener) {
96+
/**
97+
* Use this method when the transport action should continue to run in the context of the current task
98+
*/
99+
public final void execute(Task task, Request request, ActionListener<Response> listener) {
92100

93101
ActionRequestValidationException validationException = request.validate();
94102
if (validationException != null) {

core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,18 @@
2121

2222
import org.elasticsearch.action.IndicesRequest;
2323
import org.elasticsearch.action.OriginalIndices;
24+
import org.elasticsearch.action.support.ChildTaskRequest;
2425
import org.elasticsearch.action.support.IndicesOptions;
2526
import org.elasticsearch.common.io.stream.StreamInput;
2627
import org.elasticsearch.common.io.stream.StreamOutput;
2728
import org.elasticsearch.index.shard.ShardId;
28-
import org.elasticsearch.transport.TransportRequest;
2929

3030
import java.io.IOException;
3131

3232
/**
3333
*
3434
*/
35-
public abstract class BroadcastShardRequest extends TransportRequest implements IndicesRequest {
35+
public abstract class BroadcastShardRequest extends ChildTaskRequest implements IndicesRequest {
3636

3737
private ShardId shardId;
3838

core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.cluster.routing.ShardRouting;
3636
import org.elasticsearch.common.Nullable;
3737
import org.elasticsearch.common.settings.Settings;
38+
import org.elasticsearch.tasks.Task;
3839
import org.elasticsearch.threadpool.ThreadPool;
3940
import org.elasticsearch.transport.BaseTransportResponseHandler;
4041
import org.elasticsearch.transport.TransportChannel;
@@ -69,8 +70,13 @@ protected TransportBroadcastAction(Settings settings, String actionName, ThreadP
6970
}
7071

7172
@Override
72-
protected void doExecute(Request request, ActionListener<Response> listener) {
73-
new AsyncBroadcastAction(request, listener).start();
73+
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
74+
new AsyncBroadcastAction(task, request, listener).start();
75+
}
76+
77+
@Override
78+
protected final void doExecute(Request request, ActionListener<Response> listener) {
79+
throw new UnsupportedOperationException("the task parameter is required for this operation");
7480
}
7581

7682
protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState);
@@ -93,6 +99,7 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
9399

94100
protected class AsyncBroadcastAction {
95101

102+
private final Task task;
96103
private final Request request;
97104
private final ActionListener<Response> listener;
98105
private final ClusterState clusterState;
@@ -102,7 +109,8 @@ protected class AsyncBroadcastAction {
102109
private final AtomicInteger counterOps = new AtomicInteger();
103110
private final AtomicReferenceArray shardsResponses;
104111

105-
protected AsyncBroadcastAction(Request request, ActionListener<Response> listener) {
112+
protected AsyncBroadcastAction(Task task, Request request, ActionListener<Response> listener) {
113+
this.task = task;
106114
this.request = request;
107115
this.listener = listener;
108116

@@ -158,6 +166,7 @@ protected void performOperation(final ShardIterator shardIt, final ShardRouting
158166
} else {
159167
try {
160168
final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
169+
shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
161170
DiscoveryNode node = nodes.get(shard.currentNodeId());
162171
if (node == null) {
163172
// no node connected, act as failure

0 commit comments

Comments
 (0)