Skip to content

Cleanup Various Uses of ActionListener (#40126) #42274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,27 +203,15 @@ void getFinishedTaskFromIndex(Task thisTask, GetTaskRequest request, ActionListe
request.getTaskId().toString());
get.setParentTask(clusterService.localNode().getId(), thisTask.getId());

client.get(get, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
try {
onGetFinishedTaskFromIndex(getResponse, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
// We haven't yet created the index for the task results so it can't be found.
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e,
request.getTaskId()));
} else {
listener.onFailure(e);
}
client.get(get, ActionListener.wrap(r -> onGetFinishedTaskFromIndex(r, listener), e -> {
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
// We haven't yet created the index for the task results so it can't be found.
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e,
request.getTaskId()));
} else {
listener.onFailure(e);
}
});
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,11 @@ protected void masterOperation(final SnapshotsStatusRequest request,
TransportNodesSnapshotsStatus.Request nodesRequest =
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()]))
.snapshots(snapshots).timeout(request.masterNodeTimeout());
transportNodesSnapshotsStatus.execute(nodesRequest, new ActionListener<TransportNodesSnapshotsStatus.NodesSnapshotStatus>() {
@Override
public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
try {
List<SnapshotsInProgress.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
transportNodesSnapshotsStatus.execute(nodesRequest,
ActionListener.map(
listener, nodeSnapshotStatuses ->
buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
nodeSnapshotStatuses)));
} else {
// We don't have any in-progress shards, just return current stats
listener.onResponse(buildResponse(request, currentSnapshots, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,26 +184,13 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, UpgradeReq

@Override
protected void doExecute(Task task, UpgradeRequest request, final ActionListener<UpgradeResponse> listener) {
ActionListener<UpgradeResponse> settingsUpdateListener = new ActionListener<UpgradeResponse>() {
@Override
public void onResponse(UpgradeResponse upgradeResponse) {
try {
if (upgradeResponse.versions().isEmpty()) {
listener.onResponse(upgradeResponse);
} else {
updateSettings(upgradeResponse, listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
super.doExecute(task, request, ActionListener.wrap(upgradeResponse -> {
if (upgradeResponse.versions().isEmpty()) {
listener.onResponse(upgradeResponse);
} else {
updateSettings(upgradeResponse, listener);
}
};
super.doExecute(task, request, settingsUpdateListener);
}, listener::onFailure));
}

private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,20 @@ public void execute(BulkRequest bulkRequest, long executionId) {
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
retry.withBackoff(consumer, bulkRequest, ActionListener.runAfter(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
listener.afterBulk(executionId, bulkRequest, response);
}

@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
listener.afterBulk(executionId, bulkRequest, e);
}
});
}, () -> {
semaphore.release();
latch.countDown();
}));
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
Expand Down Expand Up @@ -74,25 +73,13 @@ protected void masterOperation(PutPipelineRequest request, ClusterState state, A
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.ingest(true);
client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
@Override
public void onResponse(NodesInfoResponse nodeInfos) {
try {
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest());
}
ingestService.putPipeline(ingestInfos, request, listener);
} catch (Exception e) {
onFailure(e);
}
client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> {
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest());
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
ingestService.putPipeline(ingestInfos, request, listener);
}, listener::onFailure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -86,21 +87,16 @@ public ThreadedActionListener(Logger logger, ThreadPool threadPool, String execu

@Override
public void onResponse(final Response response) {
threadPool.executor(executor).execute(new AbstractRunnable() {
threadPool.executor(executor).execute(new ActionRunnable<Response>(listener) {
@Override
public boolean isForceExecution() {
return forceExecution;
}

@Override
protected void doRun() throws Exception {
protected void doRun() {
listener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
Expand All @@ -36,7 +37,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -287,45 +287,25 @@ class ShardTransportHandler implements TransportRequestHandler<ShardRequest> {

@Override
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
asyncShardOperation(request, task, new ActionListener<ShardResponse>() {
@Override
public void onResponse(ShardResponse response) {
try {
channel.sendResponse(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn(() -> new ParameterizedMessage(
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
asyncShardOperation(request, task,
ActionListener.wrap(channel::sendResponse, e -> {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn(() -> new ParameterizedMessage(
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
}
}
}
});
));
}
}

protected void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

transportService.getThreadPool().executor(shardExecutor).execute(new ActionRunnable<ShardResponse>(listener) {
@Override
protected void doRun() throws Exception {
listener.onResponse(shardOperation(request, task));
}
});
}

protected String getExecutor(ShardRequest request) {
return shardExecutor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -254,27 +254,16 @@ private class ShardTransportHandler implements TransportRequestHandler<Request>

@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
shardOperation(request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send response for get", inner);
shardOperation(request,
ActionListener.wrap(channel::sendResponse, e -> {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send response for get", inner);
}
}
}
});

));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ChannelActionListener;
Expand All @@ -40,7 +41,6 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -107,12 +107,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;

protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

threadPool.executor(getExecutor(request, shardId)).execute(new ActionRunnable<Response>(listener) {
@Override
protected void doRun() throws Exception {
listener.onResponse(shardOperation(request, shardId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,31 +329,19 @@ class NodeTransportHandler implements TransportRequestHandler<NodeTaskRequest> {

@Override
public void messageReceived(final NodeTaskRequest request, final TransportChannel channel, Task task) throws Exception {
nodeOperation(request, new ActionListener<NodeTasksResponse>() {
@Override
public void onResponse(
TransportTasksAction<OperationTask, TasksRequest, TasksResponse, TaskResponse>.NodeTasksResponse response) {
try {
channel.sendResponse(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
nodeOperation(request, ActionListener.wrap(channel::sendResponse,
e -> {
try {
channel.sendResponse(e);
} catch (IOException e1) {
e1.addSuppressed(e);
logger.warn("Failed to send failure", e1);
}
}
});
));
}
}


private class NodeTaskRequest extends TransportRequest {
private TasksRequest tasksRequest;

Expand Down
Loading