Skip to content

Dry up TransportResponseHandler #64395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.DirectTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;

Expand Down Expand Up @@ -473,7 +473,7 @@ protected void doRun() throws Exception {
}
} else {
transportService.sendRequest(subRequest.node, ACTION.name(), subRequest,
new TransportResponseHandler<TestResponse>() {
new DirectTransportResponseHandler<TestResponse>() {
@Override
public void handleResponse(TestResponse response) {
latchedListener.onResponse(response);
Expand All @@ -484,11 +484,6 @@ public void handleException(TransportException exp) {
latchedListener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public TestResponse read(StreamInput in) throws IOException {
return new TestResponse(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.DirectTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -264,18 +264,13 @@ private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShar
);
}
transportService.sendRequest(node, ACTION_SHARD_NAME, request,
new TransportResponseHandler<FieldCapabilitiesIndexResponse>() {
new DirectTransportResponseHandler<FieldCapabilitiesIndexResponse>() {

@Override
public FieldCapabilitiesIndexResponse read(StreamInput in) throws IOException {
return new FieldCapabilitiesIndexResponse(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(final FieldCapabilitiesIndexResponse response) {
if (response.canMatch()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.DirectTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand All @@ -54,7 +54,7 @@
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {

private static String ACTION_NAME = "internal:index/seq_no/resync";
private static final String ACTION_NAME = "internal:index/seq_no/resync";
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
if (shard.indexSettings().getIndexMetadata().isSystem()) {
return Names.SYSTEM_WRITE;
Expand Down Expand Up @@ -163,17 +163,12 @@ public void sync(ResyncReplicationRequest request, Task parentTask, String prima
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
parentTask,
transportOptions,
new TransportResponseHandler<ResyncReplicationResponse>() {
new DirectTransportResponseHandler<ResyncReplicationResponse>() {
@Override
public ResyncReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ResyncReplicationResponse response) {
final ReplicationResponse.ShardInfo.Failure[] failures = response.getShardInfo().getFailures();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.DirectTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -177,17 +177,12 @@ protected void performOperation(final ShardIterator shardIt, final ShardRouting
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
transportService.sendRequest(node, transportShardAction, shardRequest,
new TransportResponseHandler<ShardResponse>() {
new DirectTransportResponseHandler<ShardResponse>() {
@Override
public ShardResponse read(StreamInput in) throws IOException {
return readShardResponse(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ShardResponse response) {
onOperation(shard, shardIndex, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,13 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.DirectTransportResponseHandler;
import org.elasticsearch.transport.NodeShouldNotConnectException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -321,7 +320,8 @@ private void sendNodeRequest(final DiscoveryNode node, List<ShardRouting> shards
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}
transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler<NodeResponse>() {
transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest,
new DirectTransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse read(StreamInput in) throws IOException {
return new NodeResponse(in);
Expand All @@ -336,11 +336,6 @@ public void handleResponse(NodeResponse response) {
public void handleException(TransportException exp) {
onNodeFailure(node, nodeIndex, exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
} catch (Exception e) {
onNodeFailure(node, nodeIndex, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.DirectTransportResponseHandler;
import org.elasticsearch.transport.NodeShouldNotConnectException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -211,7 +211,7 @@ void start() {
}

transportService.sendRequest(node, getTransportNodeAction(node), nodeRequest, builder.build(),
new TransportResponseHandler<NodeResponse>() {
new DirectTransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse read(StreamInput in) throws IOException {
return newNodeResponse(in);
Expand All @@ -226,11 +226,6 @@ public void handleResponse(NodeResponse response) {
public void handleException(TransportException exp) {
onFailure(idx, node.getId(), exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
} catch (Exception e) {
onFailure(idx, nodeId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.DirectTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -794,18 +794,13 @@ private void performRemoteAction(ClusterState state, ShardRouting primary, Disco

private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
final TransportRequest requestToPerform) {
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {
transportService.sendRequest(node, action, requestToPerform, transportOptions, new DirectTransportResponseHandler<Response>() {

@Override
public Response read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(Response response) {
finishOnSuccess(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.DirectTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -191,18 +191,14 @@ protected void doStart(ClusterState clusterState) {

request.shardId = shardIt.shardId();
DiscoveryNode node = clusterState.nodes().get(shard.currentNodeId());
transportService.sendRequest(node, shardActionName, request, transportOptions(), new TransportResponseHandler<Response>() {
transportService.sendRequest(node, shardActionName, request, transportOptions(),
new DirectTransportResponseHandler<Response>() {

@Override
public Response read(StreamInput in) throws IOException {
return newResponse(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(Response response) {
listener.onResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.DirectTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -176,17 +176,12 @@ public void start() {
// just execute it on the local node
final Writeable.Reader<Response> reader = getResponseReader();
transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(),
new TransportResponseHandler<Response>() {
new DirectTransportResponseHandler<Response>() {
@Override
public Response read(StreamInput in) throws IOException {
return reader.read(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
Expand Down Expand Up @@ -244,18 +239,13 @@ private void perform(@Nullable final Exception currentFailure) {
}
final Writeable.Reader<Response> reader = getResponseReader();
transportService.sendRequest(node, transportShardAction, internalRequest.request(),
new TransportResponseHandler<Response>() {
new DirectTransportResponseHandler<Response>() {

@Override
public Response read(StreamInput in) throws IOException {
return reader.read(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.DirectTransportResponseHandler;
import org.elasticsearch.transport.NodeShouldNotConnectException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -262,7 +261,7 @@ private void start() {
NodeTaskRequest nodeRequest = new NodeTaskRequest(request);
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(),
new TransportResponseHandler<NodeTasksResponse>() {
new DirectTransportResponseHandler<NodeTasksResponse>() {
@Override
public NodeTasksResponse read(StreamInput in) throws IOException {
return new NodeTasksResponse(in);
Expand All @@ -277,11 +276,6 @@ public void handleResponse(NodeTasksResponse response) {
public void handleException(TransportException exp) {
onFailure(idx, node.getId(), exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
} catch (Exception e) {
Expand Down
Loading