Skip to content

Removed the operation_threaded option. #13119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
public abstract class TransportBroadcastAction<Request extends BroadcastRequest, Response extends BroadcastResponse, ShardRequest extends BroadcastShardRequest, ShardResponse extends BroadcastShardResponse>
extends HandledTransportAction<Request, Response> {

protected final ThreadPool threadPool;
protected final ClusterService clusterService;
protected final TransportService transportService;

Expand All @@ -59,7 +58,6 @@ protected TransportBroadcastAction(Settings settings, String actionName, ThreadP
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
this.transportShardAction = actionName + "[s]";

transportService.registerRequestHandler(transportShardAction, shardRequest, shardExecutor, new ShardTransportHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public abstract class ReplicationRequest<T extends ReplicationRequest> extends A
protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;

private boolean threadedOperation = true;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private volatile boolean canHaveDuplicates = false;

Expand Down Expand Up @@ -76,7 +75,6 @@ protected ReplicationRequest(T request, ActionRequest originalRequest) {
super(originalRequest);
this.timeout = request.timeout();
this.index = request.index();
this.threadedOperation = request.operationThreaded();
this.consistencyLevel = request.consistencyLevel();
}

Expand All @@ -91,23 +89,6 @@ public boolean canHaveDuplicates() {
return canHaveDuplicates;
}

/**
* Controls if the operation will be executed on a separate thread when executed locally.
*/
public final boolean operationThreaded() {
return threadedOperation;
}

/**
* Controls if the operation will be executed on a separate thread when executed locally. Defaults
* to <tt>true</tt> when running in embedded mode.
*/
@SuppressWarnings("unchecked")
public final T operationThreaded(boolean threadedOperation) {
this.threadedOperation = threadedOperation;
return (T) this;
}

/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ protected ReplicationRequestBuilder(ElasticsearchClient client, Action<Request,
super(client, action, request);
}

/**
* Controls if the operation will be executed on a separate thread when executed locally. Defaults
* to <tt>true</tt> when running in embedded mode.
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setOperationThreaded(boolean threadedOperation) {
request.operationThreaded(threadedOperation);
return (RequestBuilder) this;
}

/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,6 @@ public <T extends ActionWriteResponse> T response() {
class OperationTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// if we have a local operation, execute it on a thread since we don't spawn
request.operationThreaded(true);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
Expand Down Expand Up @@ -440,21 +438,17 @@ protected ShardRouting resolvePrimary(ShardIterator shardIt) {
protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) {
if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
try {
if (internalRequest.request().operationThreaded()) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
finishAsFailed(t);
}
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
finishAsFailed(t);
}

@Override
protected void doRun() throws Exception {
performOnPrimary(primary, shardsIt);
}
});
} else {
performOnPrimary(primary, shardsIt);
}
@Override
protected void doRun() throws Exception {
performOnPrimary(primary, shardsIt);
}
});
} catch (Throwable t) {
finishAsFailed(t);
}
Expand Down Expand Up @@ -506,9 +500,6 @@ void retry(Throwable failure) {
finishAsFailed(failure);
return;
}
// make it threaded operation so we fork on the discovery listener thread
internalRequest.request().operationThreaded(true);

observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
Expand Down Expand Up @@ -904,43 +895,33 @@ public void handleException(TransportException exp) {

});
} else {
if (replicaRequest.operationThreaded()) {
try {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() {
try {
shardOperationOnReplica(shard.shardId(), replicaRequest);
onReplicaSuccess();
} catch (Throwable e) {
onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
try {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() {
try {
shardOperationOnReplica(shard.shardId(), replicaRequest);
onReplicaSuccess();
} catch (Throwable e) {
onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
}

// we must never reject on because of thread pool capacity on replicas
@Override
public boolean isForceExecution() {
return true;
}
// we must never reject on because of thread pool capacity on replicas
@Override
public boolean isForceExecution() {
return true;
}

@Override
public void onFailure(Throwable t) {
onReplicaFailure(nodeId, t);
}
});
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
onReplicaFailure(nodeId, e);
}
} else {
try {
shardOperationOnReplica(shard.shardId(), replicaRequest);
onReplicaSuccess();
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
onReplicaFailure(nodeId, e);
}
@Override
public void onFailure(Throwable t) {
onReplicaFailure(nodeId, t);
}
});
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
onReplicaFailure(nodeId, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) {
.routing(request.routing())
.parent(request.parent())
.consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
if (request.versionType() != VersionType.INTERNAL) {
// in all but the internal versioning mode, we want to create the new document using the given version.
indexRequest.version(request.version()).versionType(request.versionType());
Expand Down Expand Up @@ -227,13 +226,11 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) {
.consistencyLevel(request.consistencyLevel())
.timestamp(timestamp).ttl(ttl)
.refresh(request.refresh());
indexRequest.operationThreaded(false);
return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType);
} else if ("delete".equals(operation)) {
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.version(updateVersion).versionType(request.versionType())
.consistencyLevel(request.consistencyLevel());
deleteRequest.operationThreaded(false);
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) {
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ public RestDeleteAction(Settings settings, RestController controller, Client cli
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id"));

deleteRequest.operationThreaded(true);

deleteRequest.routing(request.param("routing"));
deleteRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public void handleRequest(RestRequest request, RestChannel channel, final Client
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
indexRequest.operationThreaded(true);
indexRequest.routing(request.param("routing"));
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,6 @@ static class Request extends ReplicationRequest<Request> {
public AtomicInteger processedOnReplicas = new AtomicInteger();

Request() {
this.operationThreaded(randomBoolean());
}

Request(ShardId shardId) {
Expand Down