Skip to content

Commit c74527e

Browse files
Simplify some Common ActionRunnable Uses (#47799)
Especially in the snapshot code there's a lot of logic chaining `ActionRunnables` in tricky ways now and the code is getting hard to follow. This change introduces two convinience methods that make it clear that a wrapped listener is invoked with certainty in some trickier spots and shortens the code a bit.
1 parent de6c674 commit c74527e

File tree

14 files changed

+134
-188
lines changed

14 files changed

+134
-188
lines changed

plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,7 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
147147
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
148148
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
149149
for (String blobName : blobNames) {
150-
executor.execute(new ActionRunnable<>(listener) {
151-
@Override
152-
protected void doRun() throws IOException {
153-
deleteBlobIgnoringIfNotExists(blobName);
154-
listener.onResponse(null);
155-
}
156-
});
150+
executor.execute(ActionRunnable.run(listener, () -> deleteBlobIgnoringIfNotExists(blobName)));
157151
}
158152
}
159153
try {

server/src/main/java/org/elasticsearch/action/ActionRunnable.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.elasticsearch.action;
2121

2222
import org.elasticsearch.common.CheckedConsumer;
23+
import org.elasticsearch.common.CheckedRunnable;
24+
import org.elasticsearch.common.CheckedSupplier;
2325
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2426

2527
/**
@@ -30,6 +32,32 @@ public abstract class ActionRunnable<Response> extends AbstractRunnable {
3032

3133
protected final ActionListener<Response> listener;
3234

35+
/**
36+
* Creates a {@link Runnable} that invokes the given listener with {@code null} after the given runnable has executed.
37+
* @param listener Listener to invoke
38+
* @param runnable Runnable to execute
39+
* @return Wrapped {@code Runnable}
40+
*/
41+
public static <T> ActionRunnable<T> run(ActionListener<T> listener, CheckedRunnable<Exception> runnable) {
42+
return new ActionRunnable<>(listener) {
43+
@Override
44+
protected void doRun() throws Exception {
45+
runnable.run();
46+
listener.onResponse(null);
47+
}
48+
};
49+
}
50+
51+
/**
52+
* Creates a {@link Runnable} that invokes the given listener with the return of the given supplier.
53+
* @param listener Listener to invoke
54+
* @param supplier Supplier that provides value to pass to listener
55+
* @return Wrapped {@code Runnable}
56+
*/
57+
public static <T> ActionRunnable<T> supply(ActionListener<T> listener, CheckedSupplier<T, Exception> supplier) {
58+
return ActionRunnable.wrap(listener, l -> l.onResponse(supplier.get()));
59+
}
60+
3361
/**
3462
* Creates a {@link Runnable} that wraps the given listener and a consumer of it that is executed when the {@link Runnable} is run.
3563
* Invokes {@link ActionListener#onFailure(Exception)} on it if an exception is thrown on executing the consumer.

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,14 @@ private void getMultipleReposSnapshotInfo(List<RepositoryMetaData> repos, String
123123

124124
// run concurrently for all repos on GENERIC thread pool
125125
for (final RepositoryMetaData repo : repos) {
126-
threadPool.executor(ThreadPool.Names.GENERIC).execute(new ActionRunnable<>(groupedActionListener) {
127-
@Override
128-
protected void doRun() {
129-
try {
130-
groupedActionListener.onResponse(GetSnapshotsResponse.Response.snapshots(
131-
repo.name(), getSingleRepoSnapshotInfo(repo.name(), snapshots, ignoreUnavailable, verbose)));
132-
} catch (ElasticsearchException e) {
133-
groupedActionListener.onResponse(GetSnapshotsResponse.Response.error(repo.name(), e));
134-
}
126+
threadPool.executor(ThreadPool.Names.GENERIC).execute(ActionRunnable.supply(groupedActionListener, () -> {
127+
try {
128+
return GetSnapshotsResponse.Response.snapshots(
129+
repo.name(), getSingleRepoSnapshotInfo(repo.name(), snapshots, ignoreUnavailable, verbose));
130+
} catch (ElasticsearchException e) {
131+
return GetSnapshotsResponse.Response.error(repo.name(), e);
135132
}
136-
});
133+
}));
137134
}
138135
}
139136

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request,
126126
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
127127
ActionListener.wrap(
128128
nodeSnapshotStatuses -> threadPool.executor(ThreadPool.Names.GENERIC).execute(
129-
ActionRunnable.wrap(listener, l -> l.onResponse(buildResponse(request, snapshotsService.currentSnapshots(
130-
request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses)))), listener::onFailure));
129+
ActionRunnable.supply(listener, () -> buildResponse(request, snapshotsService.currentSnapshots(
130+
request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses))), listener::onFailure));
131131
} else {
132132
// We don't have any in-progress shards, just return current stats
133133
listener.onResponse(buildResponse(request, currentSnapshots, null));

server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,6 @@ public void messageReceived(ShardRequest request, TransportChannel channel, Task
300300

301301
private void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
302302
transportService.getThreadPool().executor(shardExecutor)
303-
.execute(ActionRunnable.wrap(listener, l -> l.onResponse(shardOperation(request, task))));
303+
.execute(ActionRunnable.supply(listener, () -> shardOperation(request, task)));
304304
}
305305
}

server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
107107

108108
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
109109
threadPool.executor(getExecutor(request, shardId))
110-
.execute(ActionRunnable.wrap(listener, l -> l.onResponse((shardOperation(request, shardId)))));
110+
.execute(ActionRunnable.supply(listener, () -> shardOperation(request, shardId)));
111111
}
112112

113113
protected abstract Writeable.Reader<Response> getResponseReader();

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -406,10 +406,7 @@ private Releasable acquireStore(Store store) {
406406
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
407407
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
408408
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
409-
threadPool.generic().execute(ActionRunnable.wrap(future, l -> {
410-
store.decRef();
411-
l.onResponse(null);
412-
}));
409+
threadPool.generic().execute(ActionRunnable.run(future, store::decRef));
413410
FutureUtils.get(future);
414411
});
415412
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -448,14 +448,13 @@ private void cleanupStaleBlobs(Map<String, BlobContainer> foundIndices, Map<Stri
448448
}, listener::onFailure), 2);
449449

450450
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
451-
executor.execute(ActionRunnable.wrap(groupedListener, l -> {
451+
executor.execute(ActionRunnable.supply(groupedListener, () -> {
452452
List<String> deletedBlobs = cleanupStaleRootFiles(staleRootBlobs(newRepoData, rootBlobs.keySet()));
453-
l.onResponse(
454-
new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum()));
453+
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
455454
}));
456455

457456
final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
458-
executor.execute(ActionRunnable.wrap(groupedListener, l -> l.onResponse(cleanupStaleIndices(foundIndices, survivingIndexIds))));
457+
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
459458
}
460459

461460
/**
@@ -670,26 +669,22 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
670669
// that decrements the generation it points at
671670

672671
// Write Global MetaData
673-
executor.execute(ActionRunnable.wrap(allMetaListener, l -> {
674-
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false);
675-
l.onResponse(null);
676-
}));
672+
executor.execute(ActionRunnable.run(allMetaListener,
673+
() -> globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false)));
677674

678675
// write the index metadata for each index in the snapshot
679676
for (IndexId index : indices) {
680-
executor.execute(ActionRunnable.wrap(allMetaListener, l -> {
681-
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
682-
l.onResponse(null);
683-
}));
677+
executor.execute(ActionRunnable.run(allMetaListener, () ->
678+
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false)));
684679
}
685680

686-
executor.execute(ActionRunnable.wrap(allMetaListener, afterMetaListener -> {
681+
executor.execute(ActionRunnable.supply(allMetaListener, () -> {
687682
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
688683
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
689684
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
690685
includeGlobalState, userMetadata);
691686
snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false);
692-
afterMetaListener.onResponse(snapshotInfo);
687+
return snapshotInfo;
693688
}));
694689
}
695690

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
134134

135135
/**
136136
* Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
137-
* to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not
137+
* to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not
138138
* noticeably slow down searches.
139139
*/
140140
public static final Setting<Boolean> LOW_LEVEL_CANCELLATION_SETTING =
@@ -341,7 +341,7 @@ public void executeQueryPhase(ShardSearchRequest request, SearchTask task, Actio
341341
}
342342

343343
private <T> void runAsync(long id, Supplier<T> executable, ActionListener<T> listener) {
344-
getExecutor(id).execute(ActionRunnable.wrap(listener, l -> l.onResponse(executable.get())));
344+
getExecutor(id).execute(ActionRunnable.supply(listener, executable::get));
345345
}
346346

347347
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {
@@ -1053,7 +1053,7 @@ private void rewriteShardRequest(ShardSearchRequest request, ActionListener<Shar
10531053
Executor executor = getExecutor(shard);
10541054
ActionListener<Rewriteable> actionListener = ActionListener.wrap(r ->
10551055
// now we need to check if there is a pending refresh and register
1056-
shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.wrap(listener, l -> l.onResponse(request)))),
1056+
shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.supply(listener, () -> request))),
10571057
listener::onFailure);
10581058
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
10591059
// AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not

test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,7 @@ public void tearDown() throws Exception {
8282
private void deleteAndAssertEmpty(BlobPath path) throws Exception {
8383
final BlobStoreRepository repo = getRepository();
8484
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
85-
repo.threadPool().generic().execute(new ActionRunnable<>(future) {
86-
@Override
87-
protected void doRun() throws Exception {
88-
repo.blobStore().blobContainer(path).delete();
89-
future.onResponse(null);
90-
}
91-
});
85+
repo.threadPool().generic().execute(ActionRunnable.run(future, () -> repo.blobStore().blobContainer(path).delete()));
9286
future.actionGet();
9387
final BlobPath parent = path.parent();
9488
if (parent == null) {
@@ -147,19 +141,15 @@ public void testListChildren() throws Exception {
147141
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
148142
final Executor genericExec = repo.threadPool().generic();
149143
final int testBlobLen = randomIntBetween(1, 100);
150-
genericExec.execute(new ActionRunnable<>(future) {
151-
@Override
152-
protected void doRun() throws Exception {
153-
final BlobStore blobStore = repo.blobStore();
154-
blobStore.blobContainer(repo.basePath().add("foo"))
155-
.writeBlob("nested-blob", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
156-
blobStore.blobContainer(repo.basePath().add("foo").add("nested"))
157-
.writeBlob("bar", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
158-
blobStore.blobContainer(repo.basePath().add("foo").add("nested2"))
159-
.writeBlob("blub", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
160-
future.onResponse(null);
161-
}
162-
});
144+
genericExec.execute(ActionRunnable.run(future, () -> {
145+
final BlobStore blobStore = repo.blobStore();
146+
blobStore.blobContainer(repo.basePath().add("foo"))
147+
.writeBlob("nested-blob", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
148+
blobStore.blobContainer(repo.basePath().add("foo").add("nested"))
149+
.writeBlob("bar", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
150+
blobStore.blobContainer(repo.basePath().add("foo").add("nested2"))
151+
.writeBlob("blub", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
152+
}));
163153
future.actionGet();
164154
assertChildren(repo.basePath(), Collections.singleton("foo"));
165155
assertBlobsByPrefix(repo.basePath(), "fo", Collections.emptyMap());
@@ -245,37 +235,27 @@ protected void assertCleanupResponse(CleanupRepositoryResponse response, long by
245235

246236
private void createDanglingIndex(final BlobStoreRepository repo, final Executor genericExec) throws Exception {
247237
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
248-
genericExec.execute(new ActionRunnable<>(future) {
249-
@Override
250-
protected void doRun() throws Exception {
251-
final BlobStore blobStore = repo.blobStore();
252-
blobStore.blobContainer(repo.basePath().add("indices").add("foo"))
253-
.writeBlob("bar", new ByteArrayInputStream(new byte[3]), 3, false);
254-
for (String prefix : Arrays.asList("snap-", "meta-")) {
255-
blobStore.blobContainer(repo.basePath())
256-
.writeBlob(prefix + "foo.dat", new ByteArrayInputStream(new byte[3]), 3, false);
257-
}
258-
future.onResponse(null);
238+
genericExec.execute(ActionRunnable.run(future, () -> {
239+
final BlobStore blobStore = repo.blobStore();
240+
blobStore.blobContainer(repo.basePath().add("indices").add("foo"))
241+
.writeBlob("bar", new ByteArrayInputStream(new byte[3]), 3, false);
242+
for (String prefix : Arrays.asList("snap-", "meta-")) {
243+
blobStore.blobContainer(repo.basePath()).writeBlob(prefix + "foo.dat", new ByteArrayInputStream(new byte[3]), 3, false);
259244
}
260-
});
245+
}));
261246
future.actionGet();
262247
assertTrue(assertCorruptionVisible(repo, genericExec));
263248
}
264249

265250
protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor executor) throws Exception {
266251
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
267-
executor.execute(new ActionRunnable<>(future) {
268-
@Override
269-
protected void doRun() throws Exception {
270-
final BlobStore blobStore = repo.blobStore();
271-
future.onResponse(
272-
blobStore.blobContainer(repo.basePath().add("indices")).children().containsKey("foo")
273-
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath().add("indices").add("foo")), "bar")
274-
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "meta-foo.dat")
275-
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "snap-foo.dat")
276-
);
277-
}
278-
});
252+
executor.execute(ActionRunnable.supply(future, () -> {
253+
final BlobStore blobStore = repo.blobStore();
254+
return blobStore.blobContainer(repo.basePath().add("indices")).children().containsKey("foo")
255+
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath().add("indices").add("foo")), "bar")
256+
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "meta-foo.dat")
257+
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "snap-foo.dat");
258+
}));
279259
return future.actionGet();
280260
}
281261

@@ -300,13 +280,8 @@ protected void assertChildren(BlobPath path, Collection<String> children) throws
300280
private Set<String> listChildren(BlobPath path) {
301281
final PlainActionFuture<Set<String>> future = PlainActionFuture.newFuture();
302282
final BlobStoreRepository repository = getRepository();
303-
repository.threadPool().generic().execute(new ActionRunnable<>(future) {
304-
@Override
305-
protected void doRun() throws Exception {
306-
final BlobStore blobStore = repository.blobStore();
307-
future.onResponse(blobStore.blobContainer(path).children().keySet());
308-
}
309-
});
283+
repository.threadPool().generic().execute(
284+
ActionRunnable.supply(future, () -> repository.blobStore().blobContainer(path).children().keySet()));
310285
return future.actionGet();
311286
}
312287

0 commit comments

Comments
 (0)