diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index fe07a4efe930e..d1d72da544560 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -203,27 +203,15 @@ void getFinishedTaskFromIndex(Task thisTask, GetTaskRequest request, ActionListe request.getTaskId().toString()); get.setParentTask(clusterService.localNode().getId(), thisTask.getId()); - client.get(get, new ActionListener() { - @Override - public void onResponse(GetResponse getResponse) { - try { - onGetFinishedTaskFromIndex(getResponse, listener); - } catch (Exception e) { - listener.onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) { - // We haven't yet created the index for the task results so it can't be found. - listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e, - request.getTaskId())); - } else { - listener.onFailure(e); - } + client.get(get, ActionListener.wrap(r -> onGetFinishedTaskFromIndex(r, listener), e -> { + if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) { + // We haven't yet created the index for the task results so it can't be found. + listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e, + request.getTaskId())); + } else { + listener.onFailure(e); } - }); + })); } /** diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 5dfc24d1e280e..c2f0d3dd0c074 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -119,23 +119,11 @@ protected void masterOperation(final SnapshotsStatusRequest request, TransportNodesSnapshotsStatus.Request nodesRequest = new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()])) .snapshots(snapshots).timeout(request.masterNodeTimeout()); - transportNodesSnapshotsStatus.execute(nodesRequest, new ActionListener() { - @Override - public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) { - try { - List currentSnapshots = - snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())); - listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses)); - } catch (Exception e) { - listener.onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + transportNodesSnapshotsStatus.execute(nodesRequest, + ActionListener.map( + listener, nodeSnapshotStatuses -> + buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())), + nodeSnapshotStatuses))); } else { // We don't have any in-progress shards, just return current stats listener.onResponse(buildResponse(request, currentSnapshots, null)); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java index f2d046f3321b2..b122350c3e61d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java @@ -184,26 +184,13 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, UpgradeReq @Override protected void doExecute(Task task, UpgradeRequest request, final ActionListener listener) { - ActionListener settingsUpdateListener = new ActionListener() { - @Override - public void onResponse(UpgradeResponse upgradeResponse) { - try { - if (upgradeResponse.versions().isEmpty()) { - listener.onResponse(upgradeResponse); - } else { - updateSettings(upgradeResponse, listener); - } - } catch (Exception e) { - listener.onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + super.doExecute(task, request, ActionListener.wrap(upgradeResponse -> { + if (upgradeResponse.versions().isEmpty()) { + listener.onResponse(upgradeResponse); + } else { + updateSettings(upgradeResponse, listener); } - }; - super.doExecute(task, request, settingsUpdateListener); + }, listener::onFailure)); } private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java index 2f5db520088e9..7890fb4e83fc1 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java @@ -59,27 +59,20 @@ public void execute(BulkRequest bulkRequest, long executionId) { semaphore.acquire(); toRelease = semaphore::release; CountDownLatch latch = new CountDownLatch(1); - retry.withBackoff(consumer, bulkRequest, new ActionListener() { + retry.withBackoff(consumer, bulkRequest, ActionListener.runAfter(new ActionListener() { @Override public void onResponse(BulkResponse response) { - try { - listener.afterBulk(executionId, bulkRequest, response); - } finally { - semaphore.release(); - latch.countDown(); - } + listener.afterBulk(executionId, bulkRequest, response); } @Override public void onFailure(Exception e) { - try { - listener.afterBulk(executionId, bulkRequest, e); - } finally { - semaphore.release(); - latch.countDown(); - } + listener.afterBulk(executionId, bulkRequest, e); } - }); + }, () -> { + semaphore.release(); + latch.countDown(); + })); bulkRequestSetupSuccessful = true; if (concurrentRequests == 0) { latch.await(); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 97f13bf71d14c..be1528a354bc3 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -22,7 +22,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -74,25 +73,13 @@ protected void masterOperation(PutPipelineRequest request, ClusterState state, A NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); nodesInfoRequest.clear(); nodesInfoRequest.ingest(true); - client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener() { - @Override - public void onResponse(NodesInfoResponse nodeInfos) { - try { - Map ingestInfos = new HashMap<>(); - for (NodeInfo nodeInfo : nodeInfos.getNodes()) { - ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest()); - } - ingestService.putPipeline(ingestInfos, request, listener); - } catch (Exception e) { - onFailure(e); - } + client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> { + Map ingestInfos = new HashMap<>(); + for (NodeInfo nodeInfo : nodeInfos.getNodes()) { + ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest()); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ingestService.putPipeline(ingestInfos, request, listener); + }, listener::onFailure)); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java index dfcf6445abf7d..ad72ef10139ba 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; @@ -86,21 +87,16 @@ public ThreadedActionListener(Logger logger, ThreadPool threadPool, String execu @Override public void onResponse(final Response response) { - threadPool.executor(executor).execute(new AbstractRunnable() { + threadPool.executor(executor).execute(new ActionRunnable<>(listener) { @Override public boolean isForceExecution() { return forceExecution; } @Override - protected void doRun() throws Exception { + protected void doRun() { listener.onResponse(response); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } }); } diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 87c9e15324152..15daaf786b604 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -36,7 +37,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -287,45 +287,25 @@ class ShardTransportHandler implements TransportRequestHandler { @Override public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception { - asyncShardOperation(request, task, new ActionListener() { - @Override - public void onResponse(ShardResponse response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn(() -> new ParameterizedMessage( - "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); + asyncShardOperation(request, task, + ActionListener.wrap(channel::sendResponse, e -> { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn(() -> new ParameterizedMessage( + "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); + } } - } - }); + )); } } protected void asyncShardOperation(ShardRequest request, Task task, ActionListener listener) { - transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - + transportService.getThreadPool().executor(shardExecutor).execute(new ActionRunnable(listener) { @Override protected void doRun() throws Exception { listener.onResponse(shardOperation(request, task)); } }); } - - protected String getExecutor(ShardRequest request) { - return shardExecutor; - } - } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index c575c3b233872..d1d7b6ffac597 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -254,27 +254,16 @@ private class ShardTransportHandler implements TransportRequestHandler @Override public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { - shardOperation(request, new ActionListener() { - @Override - public void onResponse(Response response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn("failed to send response for get", inner); + shardOperation(request, + ActionListener.wrap(channel::sendResponse, e -> { + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn("failed to send response for get", inner); + } } - } - }); - + )); } } } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 6d7ad085dcd1e..b4ccd958998dc 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ChannelActionListener; @@ -39,7 +40,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.LoggerMessageFormat; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -106,12 +106,7 @@ protected void doExecute(Task task, Request request, ActionListener li protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException; protected void asyncShardOperation(Request request, ShardId shardId, ActionListener listener) throws IOException { - threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - + threadPool.executor(getExecutor(request, shardId)).execute(new ActionRunnable<>(listener) { @Override protected void doRun() throws Exception { listener.onResponse(shardOperation(request, shardId)); diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index c2f9872ca5cee..8d80a15beb14b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -329,19 +329,8 @@ class NodeTransportHandler implements TransportRequestHandler { @Override public void messageReceived(final NodeTaskRequest request, final TransportChannel channel, Task task) throws Exception { - nodeOperation(request, new ActionListener() { - @Override - public void onResponse( - TransportTasksAction.NodeTasksResponse response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { + nodeOperation(request, ActionListener.wrap(channel::sendResponse, + e -> { try { channel.sendResponse(e); } catch (IOException e1) { @@ -349,11 +338,10 @@ public void onFailure(Exception e) { logger.warn("Failed to send failure", e1); } } - }); + )); } } - private class NodeTaskRequest extends TransportRequest { private TasksRequest tasksRequest; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 0b22f5d660655..bf950ac23df28 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -25,6 +25,7 @@ import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; @@ -39,7 +40,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.core.internal.io.IOUtils; @@ -302,21 +302,7 @@ protected void doClose() { } public void executeDfsPhase(ShardSearchRequest request, SearchTask task, ActionListener listener) { - rewriteShardRequest(request, new ActionListener() { - @Override - public void onResponse(ShardSearchRequest request) { - try { - listener.onResponse(executeDfsPhase(request, task)); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + rewriteShardRequest(request, ActionListener.map(listener, r -> executeDfsPhase(r, task))); } private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException { @@ -351,30 +337,11 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListener listener) { - rewriteShardRequest(request, new ActionListener() { - @Override - public void onResponse(ShardSearchRequest request) { - try { - listener.onResponse(executeQueryPhase(request, task)); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task))); } private void runAsync(long id, Supplier executable, ActionListener listener) { - getExecutor(id).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - + getExecutor(id).execute(new ActionRunnable(listener) { @Override protected void doRun() { listener.onResponse(executable.get()); @@ -1058,12 +1025,7 @@ private void rewriteShardRequest(ShardSearchRequest request, ActionListener actionListener = ActionListener.wrap(r -> // now we need to check if there is a pending refresh and register shard.awaitShardSearchActive(b -> - executor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - + executor.execute(new ActionRunnable(listener) { @Override protected void doRun() { listener.onResponse(request); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java index 571ced1c118f9..b2bbd3c8cbe41 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java @@ -84,10 +84,7 @@ void registerNodeConnection(List nodeChannels, ConnectionProfile con for (TcpChannel channel : nodeChannels) { scheduledPing.addChannel(channel); - - channel.addCloseListener(ActionListener.wrap(() -> { - scheduledPing.removeChannel(channel); - })); + channel.addCloseListener(ActionListener.wrap(() -> scheduledPing.removeChannel(channel))); } } diff --git a/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java b/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java index ad2447cb7b3d0..e0ef29bf7f49e 100644 --- a/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java +++ b/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java @@ -65,19 +65,17 @@ public void testSimulatedSearchRejectionLoad() throws Throwable { client().prepareSearch("test") .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("field", "1")) - .execute(new ActionListener() { + .execute(new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(SearchResponse searchResponse) { responses.add(searchResponse); - latch.countDown(); } @Override public void onFailure(Exception e) { responses.add(e); - latch.countDown(); } - }); + }, latch)); } latch.await(); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index ec8af36dbf27e..190f85d635b68 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -470,17 +470,7 @@ public void testCancellingTasksThatDontSupportCancellation() throws Exception { connectNodes(testNodes); CountDownLatch checkLatch = new CountDownLatch(1); CountDownLatch responseLatch = new CountDownLatch(1); - Task task = startBlockingTestNodesAction(checkLatch, new ActionListener() { - @Override - public void onResponse(NodesResponse nodeResponses) { - responseLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - responseLatch.countDown(); - } - }); + Task task = startBlockingTestNodesAction(checkLatch, ActionListener.wrap(responseLatch::countDown)); String actionName = "internal:testAction"; // only pick the main action // Try to cancel main task using action name diff --git a/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java index 55c39f735ce31..bcb4a1200b7e8 100644 --- a/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -42,32 +43,24 @@ public class ClearScrollControllerTests extends ESTestCase { - public void testClearAll() throws IOException, InterruptedException { + public void testClearAll() throws InterruptedException { DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = new ActionListener() { + ActionListener listener = new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(ClearScrollResponse clearScrollResponse) { - try { - assertEquals(3, clearScrollResponse.getNumFreed()); - assertTrue(clearScrollResponse.isSucceeded()); - } finally { - latch.countDown(); - } + assertEquals(3, clearScrollResponse.getNumFreed()); + assertTrue(clearScrollResponse.isSucceeded()); } @Override public void onFailure(Exception e) { - try { - throw new AssertionError(e); - } finally { - latch.countDown(); - } + throw new AssertionError(e); } - }; + }, latch); List nodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @Override @@ -112,27 +105,18 @@ public void testClearScrollIds() throws IOException, InterruptedException { String scrollId = TransportSearchHelper.buildScrollId(array); DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = new ActionListener() { + ActionListener listener = new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(ClearScrollResponse clearScrollResponse) { - try { - assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); - assertTrue(clearScrollResponse.isSucceeded()); - } finally { - latch.countDown(); - } - + assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); + assertTrue(clearScrollResponse.isSucceeded()); } @Override public void onFailure(Exception e) { - try { - throw new AssertionError(e); - } finally { - latch.countDown(); - } + throw new AssertionError(e); } - }; + }, latch); List nodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @@ -185,32 +169,22 @@ public void testClearScrollIdsWithFailure() throws IOException, InterruptedExcep DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = new ActionListener() { + ActionListener listener = new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(ClearScrollResponse clearScrollResponse) { - try { - assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); - if (numFailures.get() > 0) { - assertFalse(clearScrollResponse.isSucceeded()); - } else { - assertTrue(clearScrollResponse.isSucceeded()); - } - - } finally { - latch.countDown(); + assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); + if (numFailures.get() > 0) { + assertFalse(clearScrollResponse.isSucceeded()); + } else { + assertTrue(clearScrollResponse.isSucceeded()); } - } @Override public void onFailure(Exception e) { - try { - throw new AssertionError(e); - } finally { - latch.countDown(); - } + throw new AssertionError(e); } - }; + }, latch); List nodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { diff --git a/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java b/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java index f222bcc015c62..96d057f50c4f7 100644 --- a/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.Node; import org.elasticsearch.tasks.Task; @@ -65,7 +66,7 @@ public void shutdown() throws Exception { terminate(threadPool); } - public void testActionFiltersRequest() throws ExecutionException, InterruptedException { + public void testActionFiltersRequest() throws InterruptedException { int numFilters = randomInt(10); Set orders = new HashSet<>(numFilters); while (orders.size() < numFilters) { @@ -139,7 +140,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener failures = new CopyOnWriteArrayList<>(); - transportAction.execute(new TestRequest(), new ActionListener() { + transportAction.execute(new TestRequest(), new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(TestResponse testResponse) { responses.incrementAndGet(); - latch.countDown(); } @Override public void onFailure(Exception e) { failures.add(e); - latch.countDown(); } - }); + }, latch)); if (!latch.await(10, TimeUnit.SECONDS)) { fail("timeout waiting for the filter to notify the listener as many times as expected"); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 98c4f215fca8a..c3b1412752a93 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -368,17 +368,7 @@ public void testConcurrentWriteReplicaResultCompletion() throws InterruptedExcep CountDownLatch completionLatch = new CountDownLatch(1); threadPool.generic().execute(() -> { waitForBarrier.run(); - replicaResult.respond(new ActionListener() { - @Override - public void onResponse(TransportResponse.Empty empty) { - completionLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - completionLatch.countDown(); - } - }); + replicaResult.respond(ActionListener.wrap(completionLatch::countDown)); }); if (randomBoolean()) { threadPool.generic().execute(() -> { diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java index aeb4d9b3a9bfb..2ea6567c9f8d0 100644 --- a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java @@ -23,9 +23,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.persistent.TestPersistentTasksPlugin; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; @@ -72,17 +70,7 @@ public void testEnableAssignmentAfterRestart() throws Exception { for (int i = 0; i < numberOfTasks; i++) { PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); service.sendStartRequest("task_" + i, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), - new ActionListener>() { - @Override - public void onResponse(PersistentTask task) { - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - latch.countDown(); - } - }); + ActionListener.wrap(latch::countDown)); } latch.await();