Skip to content

Commit a0565f0

Browse files
committed
Removed the operation_threaded option.
This low level option isn't worth the complexity and an operation should never happen on the network thread.
1 parent 3b400aa commit a0565f0

File tree

8 files changed

+34
-92
lines changed

8 files changed

+34
-92
lines changed

core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
public abstract class TransportBroadcastAction<Request extends BroadcastRequest, Response extends BroadcastResponse, ShardRequest extends BroadcastShardRequest, ShardResponse extends BroadcastShardResponse>
4848
extends HandledTransportAction<Request, Response> {
4949

50-
protected final ThreadPool threadPool;
5150
protected final ClusterService clusterService;
5251
protected final TransportService transportService;
5352

@@ -59,7 +58,6 @@ protected TransportBroadcastAction(Settings settings, String actionName, ThreadP
5958
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
6059
this.clusterService = clusterService;
6160
this.transportService = transportService;
62-
this.threadPool = threadPool;
6361
this.transportShardAction = actionName + "[s]";
6462

6563
transportService.registerRequestHandler(transportShardAction, shardRequest, shardExecutor, new ShardTransportHandler());

core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ public abstract class ReplicationRequest<T extends ReplicationRequest> extends A
4646
protected TimeValue timeout = DEFAULT_TIMEOUT;
4747
protected String index;
4848

49-
private boolean threadedOperation = true;
5049
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
5150
private volatile boolean canHaveDuplicates = false;
5251

@@ -76,7 +75,6 @@ protected ReplicationRequest(T request, ActionRequest originalRequest) {
7675
super(originalRequest);
7776
this.timeout = request.timeout();
7877
this.index = request.index();
79-
this.threadedOperation = request.operationThreaded();
8078
this.consistencyLevel = request.consistencyLevel();
8179
}
8280

@@ -91,23 +89,6 @@ public boolean canHaveDuplicates() {
9189
return canHaveDuplicates;
9290
}
9391

94-
/**
95-
* Controls if the operation will be executed on a separate thread when executed locally.
96-
*/
97-
public final boolean operationThreaded() {
98-
return threadedOperation;
99-
}
100-
101-
/**
102-
* Controls if the operation will be executed on a separate thread when executed locally. Defaults
103-
* to <tt>true</tt> when running in embedded mode.
104-
*/
105-
@SuppressWarnings("unchecked")
106-
public final T operationThreaded(boolean threadedOperation) {
107-
this.threadedOperation = threadedOperation;
108-
return (T) this;
109-
}
110-
11192
/**
11293
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
11394
*/

core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,6 @@ protected ReplicationRequestBuilder(ElasticsearchClient client, Action<Request,
3535
super(client, action, request);
3636
}
3737

38-
/**
39-
* Controls if the operation will be executed on a separate thread when executed locally. Defaults
40-
* to <tt>true</tt> when running in embedded mode.
41-
*/
42-
@SuppressWarnings("unchecked")
43-
public final RequestBuilder setOperationThreaded(boolean threadedOperation) {
44-
request.operationThreaded(threadedOperation);
45-
return (RequestBuilder) this;
46-
}
47-
4838
/**
4939
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
5040
*/

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 34 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,6 @@ public <T extends ActionWriteResponse> T response() {
211211
class OperationTransportHandler implements TransportRequestHandler<Request> {
212212
@Override
213213
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
214-
// if we have a local operation, execute it on a thread since we don't spawn
215-
request.operationThreaded(true);
216214
execute(request, new ActionListener<Response>() {
217215
@Override
218216
public void onResponse(Response result) {
@@ -440,21 +438,17 @@ protected ShardRouting resolvePrimary(ShardIterator shardIt) {
440438
protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) {
441439
if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
442440
try {
443-
if (internalRequest.request().operationThreaded()) {
444-
threadPool.executor(executor).execute(new AbstractRunnable() {
445-
@Override
446-
public void onFailure(Throwable t) {
447-
finishAsFailed(t);
448-
}
441+
threadPool.executor(executor).execute(new AbstractRunnable() {
442+
@Override
443+
public void onFailure(Throwable t) {
444+
finishAsFailed(t);
445+
}
449446

450-
@Override
451-
protected void doRun() throws Exception {
452-
performOnPrimary(primary, shardsIt);
453-
}
454-
});
455-
} else {
456-
performOnPrimary(primary, shardsIt);
457-
}
447+
@Override
448+
protected void doRun() throws Exception {
449+
performOnPrimary(primary, shardsIt);
450+
}
451+
});
458452
} catch (Throwable t) {
459453
finishAsFailed(t);
460454
}
@@ -506,9 +500,6 @@ void retry(Throwable failure) {
506500
finishAsFailed(failure);
507501
return;
508502
}
509-
// make it threaded operation so we fork on the discovery listener thread
510-
internalRequest.request().operationThreaded(true);
511-
512503
observer.waitForNextChange(new ClusterStateObserver.Listener() {
513504
@Override
514505
public void onNewClusterState(ClusterState state) {
@@ -904,43 +895,33 @@ public void handleException(TransportException exp) {
904895

905896
});
906897
} else {
907-
if (replicaRequest.operationThreaded()) {
908-
try {
909-
threadPool.executor(executor).execute(new AbstractRunnable() {
910-
@Override
911-
protected void doRun() {
912-
try {
913-
shardOperationOnReplica(shard.shardId(), replicaRequest);
914-
onReplicaSuccess();
915-
} catch (Throwable e) {
916-
onReplicaFailure(nodeId, e);
917-
failReplicaIfNeeded(shard.index(), shard.id(), e);
918-
}
898+
try {
899+
threadPool.executor(executor).execute(new AbstractRunnable() {
900+
@Override
901+
protected void doRun() {
902+
try {
903+
shardOperationOnReplica(shard.shardId(), replicaRequest);
904+
onReplicaSuccess();
905+
} catch (Throwable e) {
906+
onReplicaFailure(nodeId, e);
907+
failReplicaIfNeeded(shard.index(), shard.id(), e);
919908
}
909+
}
920910

921-
// we must never reject on because of thread pool capacity on replicas
922-
@Override
923-
public boolean isForceExecution() {
924-
return true;
925-
}
911+
// we must never reject on because of thread pool capacity on replicas
912+
@Override
913+
public boolean isForceExecution() {
914+
return true;
915+
}
926916

927-
@Override
928-
public void onFailure(Throwable t) {
929-
onReplicaFailure(nodeId, t);
930-
}
931-
});
932-
} catch (Throwable e) {
933-
failReplicaIfNeeded(shard.index(), shard.id(), e);
934-
onReplicaFailure(nodeId, e);
935-
}
936-
} else {
937-
try {
938-
shardOperationOnReplica(shard.shardId(), replicaRequest);
939-
onReplicaSuccess();
940-
} catch (Throwable e) {
941-
failReplicaIfNeeded(shard.index(), shard.id(), e);
942-
onReplicaFailure(nodeId, e);
943-
}
917+
@Override
918+
public void onFailure(Throwable t) {
919+
onReplicaFailure(nodeId, t);
920+
}
921+
});
922+
} catch (Throwable e) {
923+
failReplicaIfNeeded(shard.index(), shard.id(), e);
924+
onReplicaFailure(nodeId, e);
944925
}
945926
}
946927
}

core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) {
131131
.routing(request.routing())
132132
.parent(request.parent())
133133
.consistencyLevel(request.consistencyLevel());
134-
indexRequest.operationThreaded(false);
135134
if (request.versionType() != VersionType.INTERNAL) {
136135
// in all but the internal versioning mode, we want to create the new document using the given version.
137136
indexRequest.version(request.version()).versionType(request.versionType());
@@ -227,13 +226,11 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) {
227226
.consistencyLevel(request.consistencyLevel())
228227
.timestamp(timestamp).ttl(ttl)
229228
.refresh(request.refresh());
230-
indexRequest.operationThreaded(false);
231229
return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType);
232230
} else if ("delete".equals(operation)) {
233231
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
234232
.version(updateVersion).versionType(request.versionType())
235233
.consistencyLevel(request.consistencyLevel());
236-
deleteRequest.operationThreaded(false);
237234
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
238235
} else if ("none".equals(operation)) {
239236
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);

core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ public RestDeleteAction(Settings settings, RestController controller, Client cli
5050
@Override
5151
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
5252
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id"));
53-
54-
deleteRequest.operationThreaded(true);
55-
5653
deleteRequest.routing(request.param("routing"));
5754
deleteRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
5855
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));

core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public void handleRequest(RestRequest request, RestChannel channel, final Client
7070
@Override
7171
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
7272
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
73-
indexRequest.operationThreaded(true);
7473
indexRequest.routing(request.param("routing"));
7574
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
7675
indexRequest.timestamp(request.param("timestamp"));

core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,6 @@ static class Request extends ReplicationRequest<Request> {
661661
public AtomicInteger processedOnReplicas = new AtomicInteger();
662662

663663
Request() {
664-
this.operationThreaded(randomBoolean());
665664
}
666665

667666
Request(ShardId shardId) {

0 commit comments

Comments
 (0)