Skip to content

Dry up TransportResponseHandler #64395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,6 @@ public void handleException(TransportException exp) {
latchedListener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public TestResponse read(StreamInput in) throws IOException {
return new TestResponse(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,6 @@ public FieldCapabilitiesIndexResponse read(StreamInput in) throws IOException {
return new FieldCapabilitiesIndexResponse(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(final FieldCapabilitiesIndexResponse response) {
if (response.canMatch()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {

private static String ACTION_NAME = "internal:index/seq_no/resync";
private static final String ACTION_NAME = "internal:index/seq_no/resync";
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
if (shard.indexSettings().getIndexMetadata().isSystem()) {
return Names.SYSTEM_WRITE;
Expand Down Expand Up @@ -169,11 +169,6 @@ public ResyncReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ResyncReplicationResponse response) {
final ReplicationResponse.ShardInfo.Failure[] failures = response.getShardInfo().getFailures();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,6 @@ public ShardResponse read(StreamInput in) throws IOException {
return readShardResponse(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ShardResponse response) {
onOperation(shard, shardIndex, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeShouldNotConnectException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
Expand Down Expand Up @@ -336,11 +335,6 @@ public void handleResponse(NodeResponse response) {
public void handleException(TransportException exp) {
onNodeFailure(node, nodeIndex, exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
} catch (Exception e) {
onNodeFailure(node, nodeIndex, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,6 @@ public void handleResponse(NodeResponse response) {
public void handleException(TransportException exp) {
onFailure(idx, node.getId(), exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
} catch (Exception e) {
onFailure(idx, nodeId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,11 +801,6 @@ public Response read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(Response response) {
finishOnSuccess(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,6 @@ public Response read(StreamInput in) throws IOException {
return newResponse(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(Response response) {
listener.onResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,6 @@ public Response read(StreamInput in) throws IOException {
return reader.read(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
Expand Down Expand Up @@ -251,11 +246,6 @@ public Response read(StreamInput in) throws IOException {
return reader.read(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeShouldNotConnectException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
Expand Down Expand Up @@ -277,11 +276,6 @@ public void handleResponse(NodeTasksResponse response) {
public void handleException(TransportException exp) {
onFailure(idx, node.getId(), exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportRequestOptions.Type;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -299,14 +300,10 @@ private void handleWakeUp() {

transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
new TransportResponseHandler<Empty>() {
@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}
new TransportResponseHandler.Empty() {

@Override
public void handleResponse(Empty response) {
public void handleResponse(TransportResponse.Empty response) {
if (running() == false) {
logger.trace("{} no longer running", FollowerChecker.this);
return;
Expand Down Expand Up @@ -345,12 +342,6 @@ public void handleException(TransportException exp) {

failNode(reason);
}


@Override
public String executor() {
return Names.SAME;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
Expand Down Expand Up @@ -240,15 +239,9 @@ public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join>
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
logger.debug("attempting to join {} with {}", destination, joinRequest);
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest,
new TransportResponseHandler<Empty>() {
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler.Empty() {
@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}

@Override
public void handleResponse(Empty response) {
public void handleResponse(TransportResponse.Empty response) {
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt.set(null);
Expand All @@ -261,11 +254,6 @@ public void handleException(TransportException exp) {
attempt.logNow();
lastFailedJoinAttempt.set(attempt);
}

@Override
public String executor() {
return Names.SAME;
}
});
} else {
logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest);
Expand All @@ -275,27 +263,16 @@ public String executor() {
void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
assert startJoinRequest.getSourceNode().isMasterNode()
: "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
startJoinRequest, new TransportResponseHandler<Empty>() {
transportService.sendRequest(destination, START_JOIN_ACTION_NAME, startJoinRequest, new TransportResponseHandler.Empty() {
@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}

@Override
public void handleResponse(Empty response) {
public void handleResponse(TransportResponse.Empty response) {
logger.debug("successful response to {} from {}", startJoinRequest, destination);
}

@Override
public void handleException(TransportException exp) {
logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,10 @@ void handleWakeUp() {

transportService.sendRequest(leader, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(transportService.getLocalNode()),
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),

new TransportResponseHandler<TransportResponse.Empty>() {

@Override
public Empty read(StreamInput in) {
return Empty.INSTANCE;
}
new TransportResponseHandler.Empty() {

@Override
public void handleResponse(Empty response) {
public void handleResponse(TransportResponse.Empty response) {
if (isClosed.get()) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
Expand Down Expand Up @@ -269,11 +263,6 @@ public void handleException(TransportException exp) {
failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp);
scheduleNextWakeUp();
}

@Override
public String executor() {
return Names.SAME;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ReplicationResponse response) {
task.setPhase("finished");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ReplicationResponse response) {
task.setPhase("finished");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,6 @@ public void handleException(TransportException exp) {
}
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

private void allNodesResponded() {
if (activeCopies.get() != expectedActiveCopies) {
logger.trace("not deleting shard {}, expected {} active copies, but only {} found active copies",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,11 +467,6 @@ public void handleResponse(UpdateIndexShardSnapshotStatusResponse response) {
public void handleException(TransportException exp) {
reqListener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
})
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ public void handleResponse(ClusterStateResponse response) {
public void handleException(TransportException exp) {
contextPreservingActionListener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ public void handleException(TransportException exp) {
throw new UncheckedIOException(e);
}
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}

static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,6 @@ void handleLocalException(TransportException e) {
listener.onFailure(e);
}
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}

static final class HandshakeRequest extends TransportRequest {
Expand Down
Loading