Skip to content

Commit b6a3698

Browse files
Cleanup more ActionListener Delegation Spots (#69662)
Cleaning up the remaining spots where the short-wrapper methods could be used that I could find.
1 parent b6eb6e3 commit b6a3698

File tree

37 files changed

+289
-784
lines changed

37 files changed

+289
-784
lines changed

server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ public RestResponse buildResponse(final Table table) throws Exception {
9696
}
9797
});
9898

99-
sendGetSettingsRequest(indices, indicesOptions, masterNodeTimeout, client, new ActionListener<>() {
99+
sendGetSettingsRequest(indices, indicesOptions, masterNodeTimeout, client, new ActionListener.Delegating<>(listener) {
100100
@Override
101101
public void onResponse(final GetSettingsResponse getSettingsResponse) {
102-
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(request, 4, listener);
102+
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(request, 4, delegate);
103103
groupedListener.onResponse(getSettingsResponse);
104104

105105
// The list of indices that will be returned is determined by the indices returned from the Get Settings call.
@@ -123,11 +123,6 @@ public void onResponse(final GetSettingsResponse getSettingsResponse) {
123123
sendClusterHealthRequest(indices, subRequestIndicesOptions, masterNodeTimeout, client,
124124
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure));
125125
}
126-
127-
@Override
128-
public void onFailure(final Exception e) {
129-
listener.onFailure(e);
130-
}
131126
});
132127
};
133128
}
@@ -199,7 +194,7 @@ private void sendIndicesStatsRequest(final String[] indices,
199194

200195
private GroupedActionListener<ActionResponse> createGroupedListener(final RestRequest request, final int size,
201196
final ActionListener<Table> listener) {
202-
return new GroupedActionListener<>(new ActionListener<>() {
197+
return new GroupedActionListener<>(new ActionListener.Delegating<>(listener) {
203198
@Override
204199
public void onResponse(final Collection<ActionResponse> responses) {
205200
try {
@@ -219,16 +214,11 @@ public void onResponse(final Collection<ActionResponse> responses) {
219214
Map<String, IndexStats> indicesStats = statsResponse.getIndices();
220215

221216
Table responseTable = buildTable(request, indicesSettings, indicesHealths, indicesStats, indicesStates);
222-
listener.onResponse(responseTable);
217+
delegate.onResponse(responseTable);
223218
} catch (Exception e) {
224219
onFailure(e);
225220
}
226221
}
227-
228-
@Override
229-
public void onFailure(final Exception e) {
230-
listener.onFailure(e);
231-
}
232222
}, size);
233223
}
234224

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -338,18 +338,10 @@ protected void doClose() {
338338
public void executeDfsPhase(ShardSearchRequest request, boolean keepStatesInContext,
339339
SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
340340
final IndexShard shard = getShard(request);
341-
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
342-
@Override
343-
public void onResponse(ShardSearchRequest rewritten) {
344-
// fork the execution in the search thread pool
345-
runAsync(getExecutor(shard), () -> executeDfsPhase(request, task, keepStatesInContext), listener);
346-
}
347-
348-
@Override
349-
public void onFailure(Exception exc) {
350-
listener.onFailure(exc);
351-
}
352-
});
341+
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> {
342+
// fork the execution in the search thread pool
343+
runAsync(getExecutor(shard), () -> executeDfsPhase(request, task, keepStatesInContext), l);
344+
}));
353345
}
354346

355347
private DfsSearchResult executeDfsPhase(ShardSearchRequest request,
@@ -385,34 +377,26 @@ public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInCo
385377
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
386378
: "empty responses require more than one shard";
387379
final IndexShard shard = getShard(request);
388-
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
389-
@Override
390-
public void onResponse(ShardSearchRequest orig) {
391-
// check if we can shortcut the query phase entirely.
392-
if (orig.canReturnNullResponseIfMatchNoDocs()) {
393-
assert orig.scroll() == null;
394-
final CanMatchResponse canMatchResp;
395-
try {
396-
ShardSearchRequest clone = new ShardSearchRequest(orig);
397-
canMatchResp = canMatch(clone, false);
398-
} catch (Exception exc) {
399-
listener.onFailure(exc);
400-
return;
401-
}
402-
if (canMatchResp.canMatch == false) {
403-
listener.onResponse(QuerySearchResult.nullInstance());
404-
return;
405-
}
380+
rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, orig) -> {
381+
// check if we can shortcut the query phase entirely.
382+
if (orig.canReturnNullResponseIfMatchNoDocs()) {
383+
assert orig.scroll() == null;
384+
final CanMatchResponse canMatchResp;
385+
try {
386+
ShardSearchRequest clone = new ShardSearchRequest(orig);
387+
canMatchResp = canMatch(clone, false);
388+
} catch (Exception exc) {
389+
l.onFailure(exc);
390+
return;
391+
}
392+
if (canMatchResp.canMatch == false) {
393+
l.onResponse(QuerySearchResult.nullInstance());
394+
return;
406395
}
407-
// fork the execution in the search thread pool
408-
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener);
409-
}
410-
411-
@Override
412-
public void onFailure(Exception exc) {
413-
listener.onFailure(exc);
414396
}
415-
});
397+
// fork the execution in the search thread pool
398+
runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), l);
399+
}));
416400
}
417401

418402
private IndexShard getShard(ShardSearchRequest request) {

server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -209,19 +209,11 @@ public void onFailure(Exception e) {
209209
DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
210210
Version.CURRENT.minimumCompatibilityVersion());
211211

212-
connectionManager.connectToNode(node, null, clusterNameValidator, new ActionListener<>() {
213-
@Override
214-
public void onResponse(Void v) {
215-
compositeListener.onResponse(v);
216-
}
217-
218-
@Override
219-
public void onFailure(Exception e) {
220-
logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]",
212+
connectionManager.connectToNode(node, null, clusterNameValidator, compositeListener.delegateResponse((l, e) -> {
213+
logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]",
221214
clusterAlias, resolved), e);
222-
compositeListener.onFailure(e);
223-
}
224-
});
215+
l.onFailure(e);
216+
}));
225217
}
226218
} else {
227219
int openConnections = connectionManager.size();

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,33 +102,28 @@ protected void shardOperation(final ForgetFollowerAction.Request request, final
102102

103103
final IndexShard indexShard = indicesService.indexServiceSafe(leaderIndex).getShard(shardRouting.shardId().id());
104104

105-
indexShard.acquirePrimaryOperationPermit(new ActionListener<Releasable>() {
105+
indexShard.acquirePrimaryOperationPermit(new ActionListener.Delegating<>(listener) {
106106
@Override
107107
public void onResponse(Releasable releasable) {
108108
try {
109109
indexShard.removeRetentionLease(id, new ActionListener<ReplicationResponse>() {
110110
@Override
111111
public void onResponse(ReplicationResponse replicationResponse) {
112112
releasable.close();
113-
listener.onResponse(EmptyResult.INSTANCE);
113+
delegate.onResponse(EmptyResult.INSTANCE);
114114
}
115115

116116
@Override
117117
public void onFailure(Exception e) {
118118
releasable.close();
119-
listener.onFailure(e);
119+
delegate.onFailure(e);
120120
}
121121
});
122122
} catch (Exception e) {
123123
releasable.close();
124-
listener.onFailure(e);
124+
onFailure(e);
125125
}
126126
}
127-
128-
@Override
129-
public void onFailure(Exception e) {
130-
listener.onFailure(e);
131-
}
132127
}, ThreadPool.Names.SAME, request);
133128
}
134129

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -52,34 +52,16 @@ protected List<XPackUsageFeatureAction> usageActions() {
5252

5353
@Override
5454
protected void masterOperation(Task task, XPackUsageRequest request, ClusterState state, ActionListener<XPackUsageResponse> listener) {
55-
final ActionListener<List<XPackFeatureSet.Usage>> usageActionListener = new ActionListener<>() {
56-
@Override
57-
public void onResponse(List<Usage> usages) {
58-
listener.onResponse(new XPackUsageResponse(usages));
59-
}
60-
61-
@Override
62-
public void onFailure(Exception e) {
63-
listener.onFailure(e);
64-
}
65-
};
55+
final ActionListener<List<XPackFeatureSet.Usage>> usageActionListener =
56+
listener.delegateFailure((l, usages) -> l.onResponse(new XPackUsageResponse(usages)));
6657
final AtomicReferenceArray<Usage> featureSetUsages = new AtomicReferenceArray<>(usageActions.size());
6758
final AtomicInteger position = new AtomicInteger(0);
68-
final BiConsumer<XPackUsageFeatureAction, ActionListener<List<Usage>>> consumer = (featureUsageAction, iteratingListener) -> {
69-
client.executeLocally(featureUsageAction, request, new ActionListener<>() {
70-
@Override
71-
public void onResponse(XPackUsageFeatureResponse usageResponse) {
59+
final BiConsumer<XPackUsageFeatureAction, ActionListener<List<Usage>>> consumer = (featureUsageAction, iteratingListener) ->
60+
client.executeLocally(featureUsageAction, request, iteratingListener.delegateFailure((l, usageResponse) -> {
7261
featureSetUsages.set(position.getAndIncrement(), usageResponse.getUsage());
7362
// the value sent back doesn't matter since our predicate keeps iterating
74-
iteratingListener.onResponse(Collections.emptyList());
75-
}
76-
77-
@Override
78-
public void onFailure(Exception e) {
79-
iteratingListener.onFailure(e);
80-
}
81-
});
82-
};
63+
l.onResponse(Collections.emptyList());
64+
}));
8365
IteratingActionListener<List<XPackFeatureSet.Usage>, XPackUsageFeatureAction> iteratingActionListener =
8466
new IteratingActionListener<>(usageActionListener, consumer, usageActions,
8567
threadPool.getThreadContext(), (ignore) -> {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -126,17 +126,8 @@ private void getSearchResponseFromTask(AsyncExecutionId searchId,
126126
if (expirationTimeMillis != -1) {
127127
task.setExpirationTime(expirationTimeMillis);
128128
}
129-
addCompletionListener.apply(task, new ActionListener<>() {
130-
@Override
131-
public void onResponse(Response response) {
132-
sendFinalResponse(request, response, nowInMillis, listener);
133-
}
134-
135-
@Override
136-
public void onFailure(Exception exc) {
137-
listener.onFailure(exc);
138-
}
139-
}, request.getWaitForCompletionTimeout());
129+
addCompletionListener.apply(task, listener.delegateFailure((l, response) ->
130+
sendFinalResponse(request, response, nowInMillis, l)), request.getWaitForCompletionTimeout());
140131
} catch (Exception exc) {
141132
listener.onFailure(exc);
142133
}
@@ -146,18 +137,7 @@ private void getSearchResponseFromIndex(AsyncExecutionId searchId,
146137
GetAsyncResultRequest request,
147138
long nowInMillis,
148139
ActionListener<Response> listener) {
149-
store.getResponse(searchId, true,
150-
new ActionListener<>() {
151-
@Override
152-
public void onResponse(Response response) {
153-
sendFinalResponse(request, response, nowInMillis, listener);
154-
}
155-
156-
@Override
157-
public void onFailure(Exception e) {
158-
listener.onFailure(e);
159-
}
160-
});
140+
store.getResponse(searchId, true, listener.delegateFailure((l, response) -> sendFinalResponse(request, response, nowInMillis, l)));
161141
}
162142

163143
private void sendFinalResponse(GetAsyncResultRequest request,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -355,18 +355,8 @@ public <T extends AsyncTask, SR extends SearchStatusResponse> void retrieveStatu
355355
SR response = statusProducerFromTask.apply(asyncTask);
356356
sendFinalStatusResponse(request, response, listener);
357357
} else { // get status response from index
358-
getStatusResponseFromIndex(asyncExecutionId, statusProducerFromIndex,
359-
new ActionListener<>() {
360-
@Override
361-
public void onResponse(SR searchStatusResponse) {
362-
sendFinalStatusResponse(request, searchStatusResponse, listener);
363-
}
364-
@Override
365-
public void onFailure(Exception e) {
366-
listener.onFailure(e);
367-
}
368-
}
369-
);
358+
getStatusResponseFromIndex(asyncExecutionId, statusProducerFromIndex, listener.delegateFailure(
359+
(l, searchStatusResponse) -> sendFinalStatusResponse(request, searchStatusResponse, l)));
370360
}
371361
} catch (Exception exc) {
372362
listener.onFailure(exc);

0 commit comments

Comments
 (0)