Skip to content

Commit ecd033b

Browse files
Cleanup Various Uses of ActionListener (#40126) (#42274)
* Cleanup Various Uses of ActionListener * Use shorter `map`, `runAfter` or `wrap` where functionally equivalent to anonymous class * Use ActionRunnable where functionally equivalent
1 parent a4e6fb4 commit ecd033b

18 files changed

+96
-307
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

+8-20
Original file line numberDiff line numberDiff line change
@@ -203,27 +203,15 @@ void getFinishedTaskFromIndex(Task thisTask, GetTaskRequest request, ActionListe
203203
request.getTaskId().toString());
204204
get.setParentTask(clusterService.localNode().getId(), thisTask.getId());
205205

206-
client.get(get, new ActionListener<GetResponse>() {
207-
@Override
208-
public void onResponse(GetResponse getResponse) {
209-
try {
210-
onGetFinishedTaskFromIndex(getResponse, listener);
211-
} catch (Exception e) {
212-
listener.onFailure(e);
213-
}
214-
}
215-
216-
@Override
217-
public void onFailure(Exception e) {
218-
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
219-
// We haven't yet created the index for the task results so it can't be found.
220-
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e,
221-
request.getTaskId()));
222-
} else {
223-
listener.onFailure(e);
224-
}
206+
client.get(get, ActionListener.wrap(r -> onGetFinishedTaskFromIndex(r, listener), e -> {
207+
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
208+
// We haven't yet created the index for the task results so it can't be found.
209+
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e,
210+
request.getTaskId()));
211+
} else {
212+
listener.onFailure(e);
225213
}
226-
});
214+
}));
227215
}
228216

229217
/**

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

+5-17
Original file line numberDiff line numberDiff line change
@@ -119,23 +119,11 @@ protected void masterOperation(final SnapshotsStatusRequest request,
119119
TransportNodesSnapshotsStatus.Request nodesRequest =
120120
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()]))
121121
.snapshots(snapshots).timeout(request.masterNodeTimeout());
122-
transportNodesSnapshotsStatus.execute(nodesRequest, new ActionListener<TransportNodesSnapshotsStatus.NodesSnapshotStatus>() {
123-
@Override
124-
public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
125-
try {
126-
List<SnapshotsInProgress.Entry> currentSnapshots =
127-
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
128-
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
129-
} catch (Exception e) {
130-
listener.onFailure(e);
131-
}
132-
}
133-
134-
@Override
135-
public void onFailure(Exception e) {
136-
listener.onFailure(e);
137-
}
138-
});
122+
transportNodesSnapshotsStatus.execute(nodesRequest,
123+
ActionListener.map(
124+
listener, nodeSnapshotStatuses ->
125+
buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
126+
nodeSnapshotStatuses)));
139127
} else {
140128
// We don't have any in-progress shards, just return current stats
141129
listener.onResponse(buildResponse(request, currentSnapshots, null));

server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java

+6-19
Original file line numberDiff line numberDiff line change
@@ -184,26 +184,13 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, UpgradeReq
184184

185185
@Override
186186
protected void doExecute(Task task, UpgradeRequest request, final ActionListener<UpgradeResponse> listener) {
187-
ActionListener<UpgradeResponse> settingsUpdateListener = new ActionListener<UpgradeResponse>() {
188-
@Override
189-
public void onResponse(UpgradeResponse upgradeResponse) {
190-
try {
191-
if (upgradeResponse.versions().isEmpty()) {
192-
listener.onResponse(upgradeResponse);
193-
} else {
194-
updateSettings(upgradeResponse, listener);
195-
}
196-
} catch (Exception e) {
197-
listener.onFailure(e);
198-
}
199-
}
200-
201-
@Override
202-
public void onFailure(Exception e) {
203-
listener.onFailure(e);
187+
super.doExecute(task, request, ActionListener.wrap(upgradeResponse -> {
188+
if (upgradeResponse.versions().isEmpty()) {
189+
listener.onResponse(upgradeResponse);
190+
} else {
191+
updateSettings(upgradeResponse, listener);
204192
}
205-
};
206-
super.doExecute(task, request, settingsUpdateListener);
193+
}, listener::onFailure));
207194
}
208195

209196
private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java

+7-14
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,20 @@ public void execute(BulkRequest bulkRequest, long executionId) {
5959
semaphore.acquire();
6060
toRelease = semaphore::release;
6161
CountDownLatch latch = new CountDownLatch(1);
62-
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
62+
retry.withBackoff(consumer, bulkRequest, ActionListener.runAfter(new ActionListener<BulkResponse>() {
6363
@Override
6464
public void onResponse(BulkResponse response) {
65-
try {
66-
listener.afterBulk(executionId, bulkRequest, response);
67-
} finally {
68-
semaphore.release();
69-
latch.countDown();
70-
}
65+
listener.afterBulk(executionId, bulkRequest, response);
7166
}
7267

7368
@Override
7469
public void onFailure(Exception e) {
75-
try {
76-
listener.afterBulk(executionId, bulkRequest, e);
77-
} finally {
78-
semaphore.release();
79-
latch.countDown();
80-
}
70+
listener.afterBulk(executionId, bulkRequest, e);
8171
}
82-
});
72+
}, () -> {
73+
semaphore.release();
74+
latch.countDown();
75+
}));
8376
bulkRequestSetupSuccessful = true;
8477
if (concurrentRequests == 0) {
8578
latch.await();

server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java

+6-19
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.action.ActionListener;
2323
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
2424
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
25-
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
2625
import org.elasticsearch.action.support.ActionFilters;
2726
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2827
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@@ -74,25 +73,13 @@ protected void masterOperation(PutPipelineRequest request, ClusterState state, A
7473
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
7574
nodesInfoRequest.clear();
7675
nodesInfoRequest.ingest(true);
77-
client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
78-
@Override
79-
public void onResponse(NodesInfoResponse nodeInfos) {
80-
try {
81-
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
82-
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
83-
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest());
84-
}
85-
ingestService.putPipeline(ingestInfos, request, listener);
86-
} catch (Exception e) {
87-
onFailure(e);
88-
}
76+
client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> {
77+
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
78+
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
79+
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest());
8980
}
90-
91-
@Override
92-
public void onFailure(Exception e) {
93-
listener.onFailure(e);
94-
}
95-
});
81+
ingestService.putPipeline(ingestInfos, request, listener);
82+
}, listener::onFailure));
9683
}
9784

9885
@Override

server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
2424
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.action.ActionRunnable;
2526
import org.elasticsearch.client.Client;
2627
import org.elasticsearch.client.transport.TransportClient;
2728
import org.elasticsearch.common.settings.Settings;
@@ -86,21 +87,16 @@ public ThreadedActionListener(Logger logger, ThreadPool threadPool, String execu
8687

8788
@Override
8889
public void onResponse(final Response response) {
89-
threadPool.executor(executor).execute(new AbstractRunnable() {
90+
threadPool.executor(executor).execute(new ActionRunnable<Response>(listener) {
9091
@Override
9192
public boolean isForceExecution() {
9293
return forceExecution;
9394
}
9495

9596
@Override
96-
protected void doRun() throws Exception {
97+
protected void doRun() {
9798
listener.onResponse(response);
9899
}
99-
100-
@Override
101-
public void onFailure(Exception e) {
102-
listener.onFailure(e);
103-
}
104100
});
105101
}
106102

server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

+11-31
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.ActionRunnable;
2425
import org.elasticsearch.action.NoShardAvailableActionException;
2526
import org.elasticsearch.action.support.ActionFilters;
2627
import org.elasticsearch.action.support.HandledTransportAction;
@@ -36,7 +37,6 @@
3637
import org.elasticsearch.cluster.service.ClusterService;
3738
import org.elasticsearch.common.Nullable;
3839
import org.elasticsearch.common.io.stream.StreamInput;
39-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4040
import org.elasticsearch.tasks.Task;
4141
import org.elasticsearch.threadpool.ThreadPool;
4242
import org.elasticsearch.transport.TransportChannel;
@@ -287,45 +287,25 @@ class ShardTransportHandler implements TransportRequestHandler<ShardRequest> {
287287

288288
@Override
289289
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
290-
asyncShardOperation(request, task, new ActionListener<ShardResponse>() {
291-
@Override
292-
public void onResponse(ShardResponse response) {
293-
try {
294-
channel.sendResponse(response);
295-
} catch (Exception e) {
296-
onFailure(e);
297-
}
298-
}
299-
300-
@Override
301-
public void onFailure(Exception e) {
302-
try {
303-
channel.sendResponse(e);
304-
} catch (Exception e1) {
305-
logger.warn(() -> new ParameterizedMessage(
306-
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
290+
asyncShardOperation(request, task,
291+
ActionListener.wrap(channel::sendResponse, e -> {
292+
try {
293+
channel.sendResponse(e);
294+
} catch (Exception e1) {
295+
logger.warn(() -> new ParameterizedMessage(
296+
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
297+
}
307298
}
308-
}
309-
});
299+
));
310300
}
311301
}
312302

313303
protected void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
314-
transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() {
315-
@Override
316-
public void onFailure(Exception e) {
317-
listener.onFailure(e);
318-
}
319-
304+
transportService.getThreadPool().executor(shardExecutor).execute(new ActionRunnable<ShardResponse>(listener) {
320305
@Override
321306
protected void doRun() throws Exception {
322307
listener.onResponse(shardOperation(request, task));
323308
}
324309
});
325310
}
326-
327-
protected String getExecutor(ShardRequest request) {
328-
return shardExecutor;
329-
}
330-
331311
}

server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java

+9-20
Original file line numberDiff line numberDiff line change
@@ -254,27 +254,16 @@ private class ShardTransportHandler implements TransportRequestHandler<Request>
254254

255255
@Override
256256
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
257-
shardOperation(request, new ActionListener<Response>() {
258-
@Override
259-
public void onResponse(Response response) {
260-
try {
261-
channel.sendResponse(response);
262-
} catch (Exception e) {
263-
onFailure(e);
264-
}
265-
}
266-
267-
@Override
268-
public void onFailure(Exception e) {
269-
try {
270-
channel.sendResponse(e);
271-
} catch (Exception inner) {
272-
inner.addSuppressed(e);
273-
logger.warn("failed to send response for get", inner);
257+
shardOperation(request,
258+
ActionListener.wrap(channel::sendResponse, e -> {
259+
try {
260+
channel.sendResponse(e);
261+
} catch (Exception inner) {
262+
inner.addSuppressed(e);
263+
logger.warn("failed to send response for get", inner);
264+
}
274265
}
275-
}
276-
});
277-
266+
));
278267
}
279268
}
280269
}

server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.action.ActionResponse;
25+
import org.elasticsearch.action.ActionRunnable;
2526
import org.elasticsearch.action.NoShardAvailableActionException;
2627
import org.elasticsearch.action.support.ActionFilters;
2728
import org.elasticsearch.action.support.ChannelActionListener;
@@ -40,7 +41,6 @@
4041
import org.elasticsearch.common.io.stream.StreamInput;
4142
import org.elasticsearch.common.io.stream.Writeable;
4243
import org.elasticsearch.common.logging.LoggerMessageFormat;
43-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4444
import org.elasticsearch.index.shard.ShardId;
4545
import org.elasticsearch.tasks.Task;
4646
import org.elasticsearch.threadpool.ThreadPool;
@@ -107,12 +107,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
107107
protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;
108108

109109
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
110-
threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
111-
@Override
112-
public void onFailure(Exception e) {
113-
listener.onFailure(e);
114-
}
115-
110+
threadPool.executor(getExecutor(request, shardId)).execute(new ActionRunnable<Response>(listener) {
116111
@Override
117112
protected void doRun() throws Exception {
118113
listener.onResponse(shardOperation(request, shardId));

server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java

+3-15
Original file line numberDiff line numberDiff line change
@@ -329,31 +329,19 @@ class NodeTransportHandler implements TransportRequestHandler<NodeTaskRequest> {
329329

330330
@Override
331331
public void messageReceived(final NodeTaskRequest request, final TransportChannel channel, Task task) throws Exception {
332-
nodeOperation(request, new ActionListener<NodeTasksResponse>() {
333-
@Override
334-
public void onResponse(
335-
TransportTasksAction<OperationTask, TasksRequest, TasksResponse, TaskResponse>.NodeTasksResponse response) {
336-
try {
337-
channel.sendResponse(response);
338-
} catch (Exception e) {
339-
onFailure(e);
340-
}
341-
}
342-
343-
@Override
344-
public void onFailure(Exception e) {
332+
nodeOperation(request, ActionListener.wrap(channel::sendResponse,
333+
e -> {
345334
try {
346335
channel.sendResponse(e);
347336
} catch (IOException e1) {
348337
e1.addSuppressed(e);
349338
logger.warn("Failed to send failure", e1);
350339
}
351340
}
352-
});
341+
));
353342
}
354343
}
355344

356-
357345
private class NodeTaskRequest extends TransportRequest {
358346
private TasksRequest tasksRequest;
359347

0 commit comments

Comments
 (0)