Skip to content

Commit 4cf5ffa

Browse files
More Efficient Ordering of Shard Upload Execution (#42791)
* Change the upload order of of snapshots to work file by file in parallel on the snapshot pool instead of merely shard-by-shard * Inspired by #39657
1 parent afdd000 commit 4cf5ffa

File tree

13 files changed

+292
-207
lines changed

13 files changed

+292
-207
lines changed

server/src/main/java/org/elasticsearch/action/ActionListener.java

+32
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.ExceptionsHelper;
2323
import org.elasticsearch.common.CheckedConsumer;
2424
import org.elasticsearch.common.CheckedFunction;
25+
import org.elasticsearch.common.CheckedRunnable;
2526
import org.elasticsearch.common.CheckedSupplier;
2627

2728
import java.util.ArrayList;
@@ -226,6 +227,37 @@ public void onFailure(Exception e) {
226227
};
227228
}
228229

230+
/**
231+
* Wraps a given listener and returns a new listener which executes the provided {@code runBefore}
232+
* callback before the listener is notified via either {@code #onResponse} or {@code #onFailure}.
233+
* If the callback throws an exception then it will be passed to the listener's {@code #onFailure} and its {@code #onResponse} will
234+
* not be executed.
235+
*/
236+
static <Response> ActionListener<Response> runBefore(ActionListener<Response> delegate, CheckedRunnable<?> runBefore) {
237+
return new ActionListener<>() {
238+
@Override
239+
public void onResponse(Response response) {
240+
try {
241+
runBefore.run();
242+
} catch (Exception ex) {
243+
delegate.onFailure(ex);
244+
return;
245+
}
246+
delegate.onResponse(response);
247+
}
248+
249+
@Override
250+
public void onFailure(Exception e) {
251+
try {
252+
runBefore.run();
253+
} catch (Exception ex) {
254+
e.addSuppressed(ex);
255+
}
256+
delegate.onFailure(e);
257+
}
258+
};
259+
}
260+
229261
/**
230262
* Wraps a given listener and returns a new listener which makes sure {@link #onResponse(Object)}
231263
* and {@link #onFailure(Exception)} of the provided listener will be called at most once.

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,11 @@ public boolean isReadOnly() {
121121
return in.isReadOnly();
122122
}
123123

124-
125124
@Override
126125
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
127-
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
128-
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
126+
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
127+
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
129128
}
130-
131129
@Override
132130
public void restoreShard(Store store, SnapshotId snapshotId,
133131
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {

server/src/main/java/org/elasticsearch/repositories/Repository.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
* <ul>
5151
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
5252
* with list of indices that will be included into the snapshot</li>
53-
* <li>Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
53+
* <li>Data nodes call {@link Repository#snapshotShard}
5454
* for each shard</li>
5555
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
5656
* </ul>
@@ -204,9 +204,10 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
204204
* @param indexId id for the index being snapshotted
205205
* @param snapshotIndexCommit commit point
206206
* @param snapshotStatus snapshot status
207+
* @param listener listener invoked on completion
207208
*/
208209
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
209-
IndexShardSnapshotStatus snapshotStatus);
210+
IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener);
210211

211212
/**
212213
* Restores snapshot of the shard.

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+127-103
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

+52-69
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.logging.log4j.LogManager;
2424
import org.apache.logging.log4j.Logger;
2525
import org.apache.logging.log4j.message.ParameterizedMessage;
26-
import org.apache.lucene.util.SetOnce;
2726
import org.elasticsearch.ExceptionsHelper;
2827
import org.elasticsearch.action.ActionListener;
2928
import org.elasticsearch.action.ActionRequestValidationException;
@@ -53,9 +52,8 @@
5352
import org.elasticsearch.common.io.stream.StreamOutput;
5453
import org.elasticsearch.common.settings.Settings;
5554
import org.elasticsearch.common.unit.TimeValue;
56-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
55+
import org.elasticsearch.core.internal.io.IOUtils;
5756
import org.elasticsearch.index.engine.Engine;
58-
import org.elasticsearch.index.engine.SnapshotFailedEngineException;
5957
import org.elasticsearch.index.shard.IndexEventListener;
6058
import org.elasticsearch.index.shard.IndexShard;
6159
import org.elasticsearch.index.shard.IndexShardState;
@@ -80,7 +78,6 @@
8078
import java.util.Iterator;
8179
import java.util.List;
8280
import java.util.Map;
83-
import java.util.concurrent.Executor;
8481
import java.util.function.Function;
8582
import java.util.stream.Collectors;
8683

@@ -298,46 +295,33 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
298295
}
299296

300297
private void startNewShards(SnapshotsInProgress.Entry entry, Map<ShardId, IndexShardSnapshotStatus> startedShards) {
301-
final Snapshot snapshot = entry.snapshot();
302-
final Map<String, IndexId> indicesMap = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
303-
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
304-
for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : startedShards.entrySet()) {
305-
final ShardId shardId = shardEntry.getKey();
306-
final IndexId indexId = indicesMap.get(shardId.getIndexName());
307-
assert indexId != null;
308-
executor.execute(new AbstractRunnable() {
309-
310-
private final SetOnce<Exception> failure = new SetOnce<>();
311-
312-
@Override
313-
public void doRun() {
314-
final IndexShard indexShard =
315-
indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
316-
snapshot(indexShard, snapshot, indexId, shardEntry.getValue());
317-
}
318-
319-
@Override
320-
public void onFailure(Exception e) {
321-
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
322-
failure.set(e);
323-
}
324-
325-
@Override
326-
public void onRejection(Exception e) {
327-
failure.set(e);
328-
}
329-
330-
@Override
331-
public void onAfter() {
332-
final Exception exception = failure.get();
333-
if (exception != null) {
334-
notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(exception));
335-
} else {
298+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
299+
final Snapshot snapshot = entry.snapshot();
300+
final Map<String, IndexId> indicesMap =
301+
entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
302+
for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : startedShards.entrySet()) {
303+
final ShardId shardId = shardEntry.getKey();
304+
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
305+
final IndexId indexId = indicesMap.get(shardId.getIndexName());
306+
assert indexId != null;
307+
snapshot(shardId, snapshot, indexId, snapshotStatus, new ActionListener<>() {
308+
@Override
309+
public void onResponse(final Void aVoid) {
310+
if (logger.isDebugEnabled()) {
311+
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
312+
logger.debug("snapshot ({}) completed to {} with {}", snapshot, snapshot.getRepository(), lastSnapshotStatus);
313+
}
336314
notifySuccessfulSnapshotShard(snapshot, shardId);
337315
}
338-
}
339-
});
340-
}
316+
317+
@Override
318+
public void onFailure(Exception e) {
319+
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
320+
notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(e));
321+
}
322+
});
323+
}
324+
});
341325
}
342326

343327
/**
@@ -346,38 +330,37 @@ public void onAfter() {
346330
* @param snapshot snapshot
347331
* @param snapshotStatus snapshot status
348332
*/
349-
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId,
350-
final IndexShardSnapshotStatus snapshotStatus) {
351-
final ShardId shardId = indexShard.shardId();
352-
if (indexShard.routingEntry().primary() == false) {
353-
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
354-
}
355-
if (indexShard.routingEntry().relocating()) {
356-
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
357-
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
358-
}
333+
private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId,
334+
final IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
335+
try {
336+
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
337+
if (indexShard.routingEntry().primary() == false) {
338+
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
339+
}
340+
if (indexShard.routingEntry().relocating()) {
341+
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
342+
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
343+
}
359344

360-
final IndexShardState indexShardState = indexShard.state();
361-
if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
362-
// shard has just been created, or still recovering
363-
throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
364-
}
345+
final IndexShardState indexShardState = indexShard.state();
346+
if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
347+
// shard has just been created, or still recovering
348+
throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
349+
}
365350

366-
final Repository repository = repositoriesService.repository(snapshot.getRepository());
367-
try {
368-
// we flush first to make sure we get the latest writes snapshotted
369-
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
351+
final Repository repository = repositoriesService.repository(snapshot.getRepository());
352+
Engine.IndexCommitRef snapshotRef = null;
353+
try {
354+
// we flush first to make sure we get the latest writes snapshotted
355+
snapshotRef = indexShard.acquireLastIndexCommit(true);
370356
repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId,
371-
snapshotRef.getIndexCommit(), snapshotStatus);
372-
if (logger.isDebugEnabled()) {
373-
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
374-
logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus);
375-
}
357+
snapshotRef.getIndexCommit(), snapshotStatus, ActionListener.runBefore(listener, snapshotRef::close));
358+
} catch (Exception e) {
359+
IOUtils.close(snapshotRef);
360+
throw e;
376361
}
377-
} catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) {
378-
throw e;
379362
} catch (Exception e) {
380-
throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e);
363+
listener.onFailure(e);
381364
}
382365
}
383366

server/src/test/java/org/elasticsearch/action/ActionListenerTests.java

+17
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,23 @@ public void testRunAfter() {
171171
}
172172
}
173173

174+
public void testRunBefore() {
175+
{
176+
AtomicBoolean afterSuccess = new AtomicBoolean();
177+
ActionListener<Object> listener =
178+
ActionListener.runBefore(ActionListener.wrap(r -> {}, e -> {}), () -> afterSuccess.set(true));
179+
listener.onResponse(null);
180+
assertThat(afterSuccess.get(), equalTo(true));
181+
}
182+
{
183+
AtomicBoolean afterFailure = new AtomicBoolean();
184+
ActionListener<Object> listener =
185+
ActionListener.runBefore(ActionListener.wrap(r -> {}, e -> {}), () -> afterFailure.set(true));
186+
listener.onFailure(null);
187+
assertThat(afterFailure.get(), equalTo(true));
188+
}
189+
}
190+
174191
public void testNotifyOnce() {
175192
AtomicInteger onResponseTimes = new AtomicInteger();
176193
AtomicInteger onFailureTimes = new AtomicInteger();

server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public boolean isReadOnly() {
202202

203203
@Override
204204
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit
205-
snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
205+
snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
206206

207207
}
208208

server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.lucene.util.IOSupplier;
3636
import org.apache.lucene.util.TestUtil;
3737
import org.elasticsearch.Version;
38+
import org.elasticsearch.action.support.PlainActionFuture;
3839
import org.elasticsearch.cluster.metadata.IndexMetaData;
3940
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
4041
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -99,10 +100,12 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException {
99100
IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID());
100101

101102
IndexCommit indexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory());
103+
final PlainActionFuture<Void> future1 = PlainActionFuture.newFuture();
102104
runGeneric(threadPool, () -> {
103105
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
104106
repository.snapshotShard(store, null, snapshotId, indexId, indexCommit,
105-
snapshotStatus);
107+
snapshotStatus, future1);
108+
future1.actionGet();
106109
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
107110
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
108111
});
@@ -124,9 +127,11 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException {
124127
SnapshotId incSnapshotId = new SnapshotId("test1", "test1");
125128
IndexCommit incIndexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory());
126129
Collection<String> commitFileNames = incIndexCommit.getFileNames();
130+
final PlainActionFuture<Void> future2 = PlainActionFuture.newFuture();
127131
runGeneric(threadPool, () -> {
128132
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
129-
repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus);
133+
repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, future2);
134+
future2.actionGet();
130135
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
131136
assertEquals(2, copy.getIncrementalFileCount());
132137
assertEquals(commitFileNames.size(), copy.getTotalFileCount());
@@ -198,4 +203,5 @@ private int indexDocs(Directory directory) throws IOException {
198203
return docs;
199204
}
200205
}
206+
201207
}

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -832,12 +832,14 @@ protected void snapshotShard(final IndexShard shard,
832832
final Snapshot snapshot,
833833
final Repository repository) throws IOException {
834834
final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
835+
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
835836
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
836837
Index index = shard.shardId().getIndex();
837838
IndexId indexId = new IndexId(index.getName(), index.getUUID());
838839

839840
repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
840-
indexCommitRef.getIndexCommit(), snapshotStatus);
841+
indexCommitRef.getIndexCommit(), snapshotStatus, future);
842+
future.actionGet();
841843
}
842844

843845
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();

test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public boolean isReadOnly() {
135135

136136
@Override
137137
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
138-
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
138+
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
139139
}
140140

141141
@Override

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ public boolean isReadOnly() {
296296

297297
@Override
298298
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
299-
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
299+
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
300300
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
301301
}
302302

0 commit comments

Comments
 (0)