Skip to content

Commit 7461c79

Browse files
Dry up TransportResponseHandler (#64395) (#64618)
The majority of implementations of this interface run on `SAME`. Drying this up saves some duplication and also makes it a little easier to understand what callbacks won't fork-off when reading the code.
1 parent 977c779 commit 7461c79

File tree

35 files changed

+47
-394
lines changed

35 files changed

+47
-394
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java

-5
Original file line numberDiff line numberDiff line change
@@ -484,11 +484,6 @@ public void handleException(TransportException exp) {
484484
latchedListener.onFailure(exp);
485485
}
486486

487-
@Override
488-
public String executor() {
489-
return ThreadPool.Names.SAME;
490-
}
491-
492487
@Override
493488
public TestResponse read(StreamInput in) throws IOException {
494489
return new TestResponse(in);

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java

-5
Original file line numberDiff line numberDiff line change
@@ -271,11 +271,6 @@ public FieldCapabilitiesIndexResponse read(StreamInput in) throws IOException {
271271
return new FieldCapabilitiesIndexResponse(in);
272272
}
273273

274-
@Override
275-
public String executor() {
276-
return ThreadPool.Names.SAME;
277-
}
278-
279274
@Override
280275
public void handleResponse(final FieldCapabilitiesIndexResponse response) {
281276
if (response.canMatch()) {

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
5555
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
5656

57-
private static String ACTION_NAME = "internal:index/seq_no/resync";
57+
private static final String ACTION_NAME = "internal:index/seq_no/resync";
5858
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
5959
if (shard.indexSettings().getIndexMetadata().isSystem()) {
6060
return Names.SYSTEM_WRITE;
@@ -169,11 +169,6 @@ public ResyncReplicationResponse read(StreamInput in) throws IOException {
169169
return newResponseInstance(in);
170170
}
171171

172-
@Override
173-
public String executor() {
174-
return ThreadPool.Names.SAME;
175-
}
176-
177172
@Override
178173
public void handleResponse(ResyncReplicationResponse response) {
179174
final ReplicationResponse.ShardInfo.Failure[] failures = response.getShardInfo().getFailures();

server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

-5
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,6 @@ public ShardResponse read(StreamInput in) throws IOException {
183183
return readShardResponse(in);
184184
}
185185

186-
@Override
187-
public String executor() {
188-
return ThreadPool.Names.SAME;
189-
}
190-
191186
@Override
192187
public void handleResponse(ShardResponse response) {
193188
onOperation(shard, shardIndex, response);

server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

-6
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.elasticsearch.common.io.stream.StreamOutput;
4545
import org.elasticsearch.common.io.stream.Writeable;
4646
import org.elasticsearch.tasks.Task;
47-
import org.elasticsearch.threadpool.ThreadPool;
4847
import org.elasticsearch.transport.NodeShouldNotConnectException;
4948
import org.elasticsearch.transport.TransportChannel;
5049
import org.elasticsearch.transport.TransportException;
@@ -336,11 +335,6 @@ public void handleResponse(NodeResponse response) {
336335
public void handleException(TransportException exp) {
337336
onNodeFailure(node, nodeIndex, exp);
338337
}
339-
340-
@Override
341-
public String executor() {
342-
return ThreadPool.Names.SAME;
343-
}
344338
});
345339
} catch (Exception e) {
346340
onNodeFailure(node, nodeIndex, e);

server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

-5
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,6 @@ public void handleResponse(NodeResponse response) {
230230
public void handleException(TransportException exp) {
231231
onFailure(idx, node.getId(), exp);
232232
}
233-
234-
@Override
235-
public String executor() {
236-
return ThreadPool.Names.SAME;
237-
}
238233
});
239234
} catch (Exception e) {
240235
onFailure(idx, nodeId, e);

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

-5
Original file line numberDiff line numberDiff line change
@@ -802,11 +802,6 @@ public Response read(StreamInput in) throws IOException {
802802
return newResponseInstance(in);
803803
}
804804

805-
@Override
806-
public String executor() {
807-
return ThreadPool.Names.SAME;
808-
}
809-
810805
@Override
811806
public void handleResponse(Response response) {
812807
finishOnSuccess(response);

server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java

-5
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,6 @@ public Response read(StreamInput in) throws IOException {
198198
return newResponse(in);
199199
}
200200

201-
@Override
202-
public String executor() {
203-
return ThreadPool.Names.SAME;
204-
}
205-
206201
@Override
207202
public void handleResponse(Response response) {
208203
listener.onResponse(response);

server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

-10
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,6 @@ public Response read(StreamInput in) throws IOException {
182182
return reader.read(in);
183183
}
184184

185-
@Override
186-
public String executor() {
187-
return ThreadPool.Names.SAME;
188-
}
189-
190185
@Override
191186
public void handleResponse(final Response response) {
192187
listener.onResponse(response);
@@ -251,11 +246,6 @@ public Response read(StreamInput in) throws IOException {
251246
return reader.read(in);
252247
}
253248

254-
@Override
255-
public String executor() {
256-
return ThreadPool.Names.SAME;
257-
}
258-
259249
@Override
260250
public void handleResponse(final Response response) {
261251
listener.onResponse(response);

server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java

-6
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.elasticsearch.common.io.stream.Writeable;
3939
import org.elasticsearch.common.util.concurrent.AtomicArray;
4040
import org.elasticsearch.tasks.Task;
41-
import org.elasticsearch.threadpool.ThreadPool;
4241
import org.elasticsearch.transport.NodeShouldNotConnectException;
4342
import org.elasticsearch.transport.TransportChannel;
4443
import org.elasticsearch.transport.TransportException;
@@ -277,11 +276,6 @@ public void handleResponse(NodeTasksResponse response) {
277276
public void handleException(TransportException exp) {
278277
onFailure(idx, node.getId(), exp);
279278
}
280-
281-
@Override
282-
public String executor() {
283-
return ThreadPool.Names.SAME;
284-
}
285279
});
286280
}
287281
} catch (Exception e) {

server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

+3-12
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.transport.TransportRequest;
4646
import org.elasticsearch.transport.TransportRequestOptions;
4747
import org.elasticsearch.transport.TransportRequestOptions.Type;
48+
import org.elasticsearch.transport.TransportResponse;
4849
import org.elasticsearch.transport.TransportResponse.Empty;
4950
import org.elasticsearch.transport.TransportResponseHandler;
5051
import org.elasticsearch.transport.TransportService;
@@ -316,14 +317,10 @@ private void handleWakeUp() {
316317
}
317318
transportService.sendRequest(discoveryNode, actionName, transportRequest,
318319
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
319-
new TransportResponseHandler<Empty>() {
320-
@Override
321-
public Empty read(StreamInput in) {
322-
return Empty.INSTANCE;
323-
}
320+
new TransportResponseHandler.Empty() {
324321

325322
@Override
326-
public void handleResponse(Empty response) {
323+
public void handleResponse(TransportResponse.Empty response) {
327324
if (running() == false) {
328325
logger.trace("{} no longer running", FollowerChecker.this);
329326
return;
@@ -362,12 +359,6 @@ public void handleException(TransportException exp) {
362359

363360
failNode(reason);
364361
}
365-
366-
367-
@Override
368-
public String executor() {
369-
return Names.SAME;
370-
}
371362
});
372363
}
373364

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

+4-26
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.cluster.service.MasterService;
3737
import org.elasticsearch.common.Priority;
3838
import org.elasticsearch.common.collect.Tuple;
39-
import org.elasticsearch.common.io.stream.StreamInput;
4039
import org.elasticsearch.common.settings.Setting;
4140
import org.elasticsearch.common.settings.Settings;
4241
import org.elasticsearch.common.unit.TimeValue;
@@ -301,14 +300,9 @@ public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join>
301300
transportRequestOptions = TransportRequestOptions.EMPTY;
302301
}
303302
transportService.sendRequest(destination, actionName, transportRequest, transportRequestOptions,
304-
new TransportResponseHandler<Empty>() {
303+
new TransportResponseHandler.Empty() {
305304
@Override
306-
public Empty read(StreamInput in) {
307-
return Empty.INSTANCE;
308-
}
309-
310-
@Override
311-
public void handleResponse(Empty response) {
305+
public void handleResponse(TransportResponse.Empty response) {
312306
pendingOutgoingJoins.remove(dedupKey);
313307
logger.debug("successfully joined {} with {}", destination, joinRequest);
314308
lastFailedJoinAttempt.set(null);
@@ -324,11 +318,6 @@ public void handleException(TransportException exp) {
324318
lastFailedJoinAttempt.set(attempt);
325319
onCompletion.run();
326320
}
327-
328-
@Override
329-
public String executor() {
330-
return Names.SAME;
331-
}
332321
});
333322
} else {
334323
logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest);
@@ -338,27 +327,16 @@ public String executor() {
338327
public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
339328
assert startJoinRequest.getSourceNode().isMasterNode()
340329
: "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
341-
transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
342-
startJoinRequest, new TransportResponseHandler<Empty>() {
330+
transportService.sendRequest(destination, START_JOIN_ACTION_NAME, startJoinRequest, new TransportResponseHandler.Empty() {
343331
@Override
344-
public Empty read(StreamInput in) {
345-
return Empty.INSTANCE;
346-
}
347-
348-
@Override
349-
public void handleResponse(Empty response) {
332+
public void handleResponse(TransportResponse.Empty response) {
350333
logger.debug("successful response to {} from {}", startJoinRequest, destination);
351334
}
352335

353336
@Override
354337
public void handleException(TransportException exp) {
355338
logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp);
356339
}
357-
358-
@Override
359-
public String executor() {
360-
return ThreadPool.Names.SAME;
361-
}
362340
});
363341
}
364342

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

+2-13
Original file line numberDiff line numberDiff line change
@@ -247,16 +247,10 @@ void handleWakeUp() {
247247
// TransportResponse.Empty here.
248248
transportService.sendRequest(leader, actionName, transportRequest,
249249
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),
250-
251-
new TransportResponseHandler<TransportResponse.Empty>() {
252-
253-
@Override
254-
public Empty read(StreamInput in) {
255-
return Empty.INSTANCE;
256-
}
250+
new TransportResponseHandler.Empty() {
257251

258252
@Override
259-
public void handleResponse(Empty response) {
253+
public void handleResponse(TransportResponse.Empty response) {
260254
if (isClosed.get()) {
261255
logger.debug("closed check scheduler received a response, doing nothing");
262256
return;
@@ -298,11 +292,6 @@ public void handleException(TransportException exp) {
298292
failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp);
299293
scheduleNextWakeUp();
300294
}
301-
302-
@Override
303-
public String executor() {
304-
return Names.SAME;
305-
}
306295
});
307296
}
308297

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java

-5
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,6 @@ public ReplicationResponse read(StreamInput in) throws IOException {
118118
return newResponseInstance(in);
119119
}
120120

121-
@Override
122-
public String executor() {
123-
return ThreadPool.Names.SAME;
124-
}
125-
126121
@Override
127122
public void handleResponse(ReplicationResponse response) {
128123
task.setPhase("finished");

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

-5
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,6 @@ public ReplicationResponse read(StreamInput in) throws IOException {
120120
return newResponseInstance(in);
121121
}
122122

123-
@Override
124-
public String executor() {
125-
return ThreadPool.Names.SAME;
126-
}
127-
128123
@Override
129124
public void handleResponse(ReplicationResponse response) {
130125
task.setPhase("finished");

server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java

-5
Original file line numberDiff line numberDiff line change
@@ -264,11 +264,6 @@ public void handleException(TransportException exp) {
264264
}
265265
}
266266

267-
@Override
268-
public String executor() {
269-
return ThreadPool.Names.SAME;
270-
}
271-
272267
private void allNodesResponded() {
273268
if (activeCopies.get() != expectedActiveCopies) {
274269
logger.trace("not deleting shard {}, expected {} active copies, but only {} found active copies",

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

-5
Original file line numberDiff line numberDiff line change
@@ -467,11 +467,6 @@ public void handleResponse(UpdateIndexShardSnapshotStatusResponse response) {
467467
public void handleException(TransportException exp) {
468468
reqListener.onFailure(exp);
469469
}
470-
471-
@Override
472-
public String executor() {
473-
return ThreadPool.Names.SAME;
474-
}
475470
})
476471
);
477472
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

-5
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,6 @@ public void handleResponse(ClusterStateResponse response) {
146146
public void handleException(TransportException exp) {
147147
contextPreservingActionListener.onFailure(exp);
148148
}
149-
150-
@Override
151-
public String executor() {
152-
return ThreadPool.Names.SAME;
153-
}
154149
});
155150
}
156151
};

server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java

-5
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,6 @@ public void handleException(TransportException exp) {
9292
throw new UncheckedIOException(e);
9393
}
9494
}
95-
96-
@Override
97-
public String executor() {
98-
return ThreadPool.Names.SAME;
99-
}
10095
}
10196

10297
static class ProxyRequest<T extends TransportRequest> extends TransportRequest {

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

-5
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,6 @@ void handleLocalException(TransportException e) {
150150
listener.onFailure(e);
151151
}
152152
}
153-
154-
@Override
155-
public String executor() {
156-
return ThreadPool.Names.SAME;
157-
}
158153
}
159154

160155
static final class HandshakeRequest extends TransportRequest {

0 commit comments

Comments
 (0)