Skip to content

Commit 13f1d1a

Browse files
Dry up TransportResponseHandler (#64395)
The majority of implementations of this interface run on `SAME`. Drying this up saves some duplication and also makes it a little easier to understand what callbacks won't fork-off when reading the code.
1 parent 6b7b297 commit 13f1d1a

File tree

35 files changed

+50
-397
lines changed

35 files changed

+50
-397
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -484,11 +484,6 @@ public void handleException(TransportException exp) {
484484
latchedListener.onFailure(exp);
485485
}
486486

487-
@Override
488-
public String executor() {
489-
return ThreadPool.Names.SAME;
490-
}
491-
492487
@Override
493488
public TestResponse read(StreamInput in) throws IOException {
494489
return new TestResponse(in);

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,11 +271,6 @@ public FieldCapabilitiesIndexResponse read(StreamInput in) throws IOException {
271271
return new FieldCapabilitiesIndexResponse(in);
272272
}
273273

274-
@Override
275-
public String executor() {
276-
return ThreadPool.Names.SAME;
277-
}
278-
279274
@Override
280275
public void handleResponse(final FieldCapabilitiesIndexResponse response) {
281276
if (response.canMatch()) {

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
5555
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
5656

57-
private static String ACTION_NAME = "internal:index/seq_no/resync";
57+
private static final String ACTION_NAME = "internal:index/seq_no/resync";
5858
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
5959
if (shard.indexSettings().getIndexMetadata().isSystem()) {
6060
return Names.SYSTEM_WRITE;
@@ -169,11 +169,6 @@ public ResyncReplicationResponse read(StreamInput in) throws IOException {
169169
return newResponseInstance(in);
170170
}
171171

172-
@Override
173-
public String executor() {
174-
return ThreadPool.Names.SAME;
175-
}
176-
177172
@Override
178173
public void handleResponse(ResyncReplicationResponse response) {
179174
final ReplicationResponse.ShardInfo.Failure[] failures = response.getShardInfo().getFailures();

server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,6 @@ public ShardResponse read(StreamInput in) throws IOException {
183183
return readShardResponse(in);
184184
}
185185

186-
@Override
187-
public String executor() {
188-
return ThreadPool.Names.SAME;
189-
}
190-
191186
@Override
192187
public void handleResponse(ShardResponse response) {
193188
onOperation(shard, shardIndex, response);

server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.elasticsearch.common.io.stream.StreamOutput;
4545
import org.elasticsearch.common.io.stream.Writeable;
4646
import org.elasticsearch.tasks.Task;
47-
import org.elasticsearch.threadpool.ThreadPool;
4847
import org.elasticsearch.transport.NodeShouldNotConnectException;
4948
import org.elasticsearch.transport.TransportChannel;
5049
import org.elasticsearch.transport.TransportException;
@@ -336,11 +335,6 @@ public void handleResponse(NodeResponse response) {
336335
public void handleException(TransportException exp) {
337336
onNodeFailure(node, nodeIndex, exp);
338337
}
339-
340-
@Override
341-
public String executor() {
342-
return ThreadPool.Names.SAME;
343-
}
344338
});
345339
} catch (Exception e) {
346340
onNodeFailure(node, nodeIndex, e);

server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,6 @@ public void handleResponse(NodeResponse response) {
226226
public void handleException(TransportException exp) {
227227
onFailure(idx, node.getId(), exp);
228228
}
229-
230-
@Override
231-
public String executor() {
232-
return ThreadPool.Names.SAME;
233-
}
234229
});
235230
} catch (Exception e) {
236231
onFailure(idx, nodeId, e);

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -801,11 +801,6 @@ public Response read(StreamInput in) throws IOException {
801801
return newResponseInstance(in);
802802
}
803803

804-
@Override
805-
public String executor() {
806-
return ThreadPool.Names.SAME;
807-
}
808-
809804
@Override
810805
public void handleResponse(Response response) {
811806
finishOnSuccess(response);

server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,6 @@ public Response read(StreamInput in) throws IOException {
198198
return newResponse(in);
199199
}
200200

201-
@Override
202-
public String executor() {
203-
return ThreadPool.Names.SAME;
204-
}
205-
206201
@Override
207202
public void handleResponse(Response response) {
208203
listener.onResponse(response);

server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,6 @@ public Response read(StreamInput in) throws IOException {
182182
return reader.read(in);
183183
}
184184

185-
@Override
186-
public String executor() {
187-
return ThreadPool.Names.SAME;
188-
}
189-
190185
@Override
191186
public void handleResponse(final Response response) {
192187
listener.onResponse(response);
@@ -251,11 +246,6 @@ public Response read(StreamInput in) throws IOException {
251246
return reader.read(in);
252247
}
253248

254-
@Override
255-
public String executor() {
256-
return ThreadPool.Names.SAME;
257-
}
258-
259249
@Override
260250
public void handleResponse(final Response response) {
261251
listener.onResponse(response);

server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.elasticsearch.common.io.stream.Writeable;
3939
import org.elasticsearch.common.util.concurrent.AtomicArray;
4040
import org.elasticsearch.tasks.Task;
41-
import org.elasticsearch.threadpool.ThreadPool;
4241
import org.elasticsearch.transport.NodeShouldNotConnectException;
4342
import org.elasticsearch.transport.TransportChannel;
4443
import org.elasticsearch.transport.TransportException;
@@ -277,11 +276,6 @@ public void handleResponse(NodeTasksResponse response) {
277276
public void handleException(TransportException exp) {
278277
onFailure(idx, node.getId(), exp);
279278
}
280-
281-
@Override
282-
public String executor() {
283-
return ThreadPool.Names.SAME;
284-
}
285279
});
286280
}
287281
} catch (Exception e) {

server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.transport.TransportRequest;
4343
import org.elasticsearch.transport.TransportRequestOptions;
4444
import org.elasticsearch.transport.TransportRequestOptions.Type;
45+
import org.elasticsearch.transport.TransportResponse;
4546
import org.elasticsearch.transport.TransportResponse.Empty;
4647
import org.elasticsearch.transport.TransportResponseHandler;
4748
import org.elasticsearch.transport.TransportService;
@@ -299,14 +300,10 @@ private void handleWakeUp() {
299300

300301
transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,
301302
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
302-
new TransportResponseHandler<Empty>() {
303-
@Override
304-
public Empty read(StreamInput in) {
305-
return Empty.INSTANCE;
306-
}
303+
new TransportResponseHandler.Empty() {
307304

308305
@Override
309-
public void handleResponse(Empty response) {
306+
public void handleResponse(TransportResponse.Empty response) {
310307
if (running() == false) {
311308
logger.trace("{} no longer running", FollowerChecker.this);
312309
return;
@@ -345,12 +342,6 @@ public void handleException(TransportException exp) {
345342

346343
failNode(reason);
347344
}
348-
349-
350-
@Override
351-
public String executor() {
352-
return Names.SAME;
353-
}
354345
});
355346
}
356347

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.cluster.service.MasterService;
3737
import org.elasticsearch.common.Priority;
3838
import org.elasticsearch.common.collect.Tuple;
39-
import org.elasticsearch.common.io.stream.StreamInput;
4039
import org.elasticsearch.common.unit.TimeValue;
4140
import org.elasticsearch.monitor.NodeHealthService;
4241
import org.elasticsearch.monitor.StatusInfo;
@@ -240,15 +239,9 @@ public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join>
240239
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
241240
if (pendingOutgoingJoins.add(dedupKey)) {
242241
logger.debug("attempting to join {} with {}", destination, joinRequest);
243-
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest,
244-
new TransportResponseHandler<Empty>() {
242+
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest, new TransportResponseHandler.Empty() {
245243
@Override
246-
public Empty read(StreamInput in) {
247-
return Empty.INSTANCE;
248-
}
249-
250-
@Override
251-
public void handleResponse(Empty response) {
244+
public void handleResponse(TransportResponse.Empty response) {
252245
pendingOutgoingJoins.remove(dedupKey);
253246
logger.debug("successfully joined {} with {}", destination, joinRequest);
254247
lastFailedJoinAttempt.set(null);
@@ -261,11 +254,6 @@ public void handleException(TransportException exp) {
261254
attempt.logNow();
262255
lastFailedJoinAttempt.set(attempt);
263256
}
264-
265-
@Override
266-
public String executor() {
267-
return Names.SAME;
268-
}
269257
});
270258
} else {
271259
logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest);
@@ -275,27 +263,16 @@ public String executor() {
275263
void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
276264
assert startJoinRequest.getSourceNode().isMasterNode()
277265
: "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
278-
transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
279-
startJoinRequest, new TransportResponseHandler<Empty>() {
266+
transportService.sendRequest(destination, START_JOIN_ACTION_NAME, startJoinRequest, new TransportResponseHandler.Empty() {
280267
@Override
281-
public Empty read(StreamInput in) {
282-
return Empty.INSTANCE;
283-
}
284-
285-
@Override
286-
public void handleResponse(Empty response) {
268+
public void handleResponse(TransportResponse.Empty response) {
287269
logger.debug("successful response to {} from {}", startJoinRequest, destination);
288270
}
289271

290272
@Override
291273
public void handleException(TransportException exp) {
292274
logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp);
293275
}
294-
295-
@Override
296-
public String executor() {
297-
return ThreadPool.Names.SAME;
298-
}
299276
});
300277
}
301278

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -218,16 +218,10 @@ void handleWakeUp() {
218218

219219
transportService.sendRequest(leader, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(transportService.getLocalNode()),
220220
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),
221-
222-
new TransportResponseHandler<TransportResponse.Empty>() {
223-
224-
@Override
225-
public Empty read(StreamInput in) {
226-
return Empty.INSTANCE;
227-
}
221+
new TransportResponseHandler.Empty() {
228222

229223
@Override
230-
public void handleResponse(Empty response) {
224+
public void handleResponse(TransportResponse.Empty response) {
231225
if (isClosed.get()) {
232226
logger.debug("closed check scheduler received a response, doing nothing");
233227
return;
@@ -269,11 +263,6 @@ public void handleException(TransportException exp) {
269263
failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp);
270264
scheduleNextWakeUp();
271265
}
272-
273-
@Override
274-
public String executor() {
275-
return Names.SAME;
276-
}
277266
});
278267
}
279268

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,6 @@ public ReplicationResponse read(StreamInput in) throws IOException {
118118
return newResponseInstance(in);
119119
}
120120

121-
@Override
122-
public String executor() {
123-
return ThreadPool.Names.SAME;
124-
}
125-
126121
@Override
127122
public void handleResponse(ReplicationResponse response) {
128123
task.setPhase("finished");

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,6 @@ public ReplicationResponse read(StreamInput in) throws IOException {
120120
return newResponseInstance(in);
121121
}
122122

123-
@Override
124-
public String executor() {
125-
return ThreadPool.Names.SAME;
126-
}
127-
128123
@Override
129124
public void handleResponse(ReplicationResponse response) {
130125
task.setPhase("finished");

server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,6 @@ public void handleException(TransportException exp) {
260260
}
261261
}
262262

263-
@Override
264-
public String executor() {
265-
return ThreadPool.Names.SAME;
266-
}
267-
268263
private void allNodesResponded() {
269264
if (activeCopies.get() != expectedActiveCopies) {
270265
logger.trace("not deleting shard {}, expected {} active copies, but only {} found active copies",

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -467,11 +467,6 @@ public void handleResponse(UpdateIndexShardSnapshotStatusResponse response) {
467467
public void handleException(TransportException exp) {
468468
reqListener.onFailure(exp);
469469
}
470-
471-
@Override
472-
public String executor() {
473-
return ThreadPool.Names.SAME;
474-
}
475470
})
476471
);
477472
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,6 @@ public void handleResponse(ClusterStateResponse response) {
146146
public void handleException(TransportException exp) {
147147
contextPreservingActionListener.onFailure(exp);
148148
}
149-
150-
@Override
151-
public String executor() {
152-
return ThreadPool.Names.SAME;
153-
}
154149
});
155150
}
156151
};

server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,6 @@ public void handleException(TransportException exp) {
9292
throw new UncheckedIOException(e);
9393
}
9494
}
95-
96-
@Override
97-
public String executor() {
98-
return ThreadPool.Names.SAME;
99-
}
10095
}
10196

10297
static class ProxyRequest<T extends TransportRequest> extends TransportRequest {

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,6 @@ void handleLocalException(TransportException e) {
151151
listener.onFailure(e);
152152
}
153153
}
154-
155-
@Override
156-
public String executor() {
157-
return ThreadPool.Names.SAME;
158-
}
159154
}
160155

161156
static final class HandshakeRequest extends TransportRequest {

0 commit comments

Comments
 (0)