Skip to content

Commit c7e36c3

Browse files
authored
Rework shard snapshot workers (#88209)
Currently, when starting the shard snapshot tasks, we loop through the shards to be snapshotted back-to-back. This will calculate whether there are changes in the shard to be snapshotted, write some metadata, and ONLY if there are changes and there are files to be uploaded, fork off the file upload calls. This could be further improved by parallelizing shard snapshot tasks (currently, only file uploads happen in parallel). This change introduces a new type of task runner which uses a queue to: 1. Paralleize (with limited number of workers) shard snapshotting, and limit the number of the concurrently running snapshot tasks. 2. Prioritize shard snapshot tasks over file snapshot tasks. Closes #83408
1 parent 6807b69 commit c7e36c3

File tree

15 files changed

+732
-75
lines changed

15 files changed

+732
-75
lines changed

docs/changelog/88209.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 88209
2+
summary: Prioritize shard snapshot tasks over file snapshot tasks and limit the number of the concurrently running snapshot tasks
3+
area: Snapshot/Restore
4+
type: enhancement
5+
issues:
6+
- 83408
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.common.util.concurrent;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
14+
import java.util.concurrent.BlockingQueue;
15+
import java.util.concurrent.Executor;
16+
import java.util.concurrent.PriorityBlockingQueue;
17+
import java.util.concurrent.atomic.AtomicInteger;
18+
19+
/**
20+
* {@link PrioritizedThrottledTaskRunner} performs the enqueued tasks in the order dictated by the
21+
* natural ordering of the tasks, limiting the max number of concurrently running tasks. Each new task
22+
* that is dequeued to be run, is forked off to the given executor.
23+
*/
24+
public class PrioritizedThrottledTaskRunner<T extends Comparable<T> & Runnable> {
25+
private static final Logger logger = LogManager.getLogger(PrioritizedThrottledTaskRunner.class);
26+
27+
private final String taskRunnerName;
28+
// The max number of tasks that this runner will schedule to concurrently run on the executor.
29+
private final int maxRunningTasks;
30+
// As we fork off dequeued tasks to the given executor, technically the following counter represents
31+
// the number of the concurrent pollAndSpawn calls currently checking the queue for a task to run. This
32+
// doesn't necessarily correspond to currently running tasks, since a pollAndSpawn could return without
33+
// actually running a task when the queue is empty.
34+
private final AtomicInteger runningTasks = new AtomicInteger();
35+
private final BlockingQueue<T> tasks = new PriorityBlockingQueue<>();
36+
private final Executor executor;
37+
38+
public PrioritizedThrottledTaskRunner(final String name, final int maxRunningTasks, final Executor executor) {
39+
assert maxRunningTasks > 0;
40+
this.taskRunnerName = name;
41+
this.maxRunningTasks = maxRunningTasks;
42+
this.executor = executor;
43+
}
44+
45+
public void enqueueTask(final T task) {
46+
logger.trace("[{}] enqueuing task {}", taskRunnerName, task);
47+
tasks.add(task);
48+
// Try to run a task since now there is at least one in the queue. If the maxRunningTasks is
49+
// reached, the task is just enqueued.
50+
pollAndSpawn();
51+
}
52+
53+
// visible for testing
54+
protected void pollAndSpawn() {
55+
// A pollAndSpawn attempts to run a new task. There could be many concurrent pollAndSpawn calls competing
56+
// to get a "free slot", since we attempt to run a new task on every enqueueTask call and every time an
57+
// existing task is finished.
58+
while (incrementRunningTasks()) {
59+
T task = tasks.poll();
60+
if (task == null) {
61+
logger.trace("[{}] task queue is empty", taskRunnerName);
62+
// We have taken up a "free slot", but there are no tasks in the queue! This could happen each time a worker
63+
// sees an empty queue after running a task. Decrement to give competing pollAndSpawn calls a chance!
64+
int decremented = runningTasks.decrementAndGet();
65+
assert decremented >= 0;
66+
// We might have blocked all competing pollAndSpawn calls. This could happen for example when
67+
// maxRunningTasks=1 and a task got enqueued just after checking the queue but before decrementing.
68+
// To be sure, return only if the queue is still empty. If the queue is not empty, this might be the
69+
// only pollAndSpawn call in progress, and returning without peeking would risk ending up with a
70+
// non-empty queue and no workers!
71+
if (tasks.peek() == null) break;
72+
} else {
73+
executor.execute(() -> runTask(task));
74+
}
75+
}
76+
}
77+
78+
// Each worker thread that runs a task, first needs to get a "free slot" in order to respect maxRunningTasks.
79+
private boolean incrementRunningTasks() {
80+
int preUpdateValue = runningTasks.getAndUpdate(v -> v < maxRunningTasks ? v + 1 : v);
81+
assert preUpdateValue <= maxRunningTasks;
82+
return preUpdateValue < maxRunningTasks;
83+
}
84+
85+
// Only use for testing
86+
public int runningTasks() {
87+
return runningTasks.get();
88+
}
89+
90+
// Only use for testing
91+
public int queueSize() {
92+
return tasks.size();
93+
}
94+
95+
private void runTask(final T task) {
96+
try {
97+
logger.trace("[{}] running task {}", taskRunnerName, task);
98+
task.run();
99+
} finally {
100+
// To avoid missing to run tasks that are enqueued and waiting, we check the queue again once running
101+
// a task is finished.
102+
int decremented = runningTasks.decrementAndGet();
103+
assert decremented >= 0;
104+
pollAndSpawn();
105+
}
106+
}
107+
}

server/src/main/java/org/elasticsearch/repositories/SnapshotShardContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public final class SnapshotShardContext extends ActionListener.Delegating<ShardS
3737
private final IndexShardSnapshotStatus snapshotStatus;
3838
private final Version repositoryMetaVersion;
3939
private final Map<String, Object> userMetadata;
40+
private final long snapshotStartTime;
4041

4142
/**
4243
* @param store store to be snapshotted
@@ -51,6 +52,8 @@ public final class SnapshotShardContext extends ActionListener.Delegating<ShardS
5152
* @param repositoryMetaVersion version of the updated repository metadata to write
5253
* @param userMetadata user metadata of the snapshot found in
5354
* {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#userMetadata()}
55+
* @param snapshotStartTime start time of the snapshot found in
56+
* {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#startTime()}
5457
* @param listener listener invoked on completion
5558
*/
5659
public SnapshotShardContext(
@@ -63,6 +66,7 @@ public SnapshotShardContext(
6366
IndexShardSnapshotStatus snapshotStatus,
6467
Version repositoryMetaVersion,
6568
Map<String, Object> userMetadata,
69+
final long snapshotStartTime,
6670
ActionListener<ShardSnapshotResult> listener
6771
) {
6872
super(ActionListener.runBefore(listener, commitRef::close));
@@ -75,6 +79,7 @@ public SnapshotShardContext(
7579
this.snapshotStatus = snapshotStatus;
7680
this.repositoryMetaVersion = repositoryMetaVersion;
7781
this.userMetadata = userMetadata;
82+
this.snapshotStartTime = snapshotStartTime;
7883
}
7984

8085
public Store store() {
@@ -114,6 +119,10 @@ public Map<String, Object> userMetadata() {
114119
return userMetadata;
115120
}
116121

122+
public long snapshotStartTime() {
123+
return snapshotStartTime;
124+
}
125+
117126
@Override
118127
public void onResponse(ShardSnapshotResult result) {
119128
delegate.onResponse(result);

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
import java.util.stream.Stream;
151151

152152
import static org.elasticsearch.core.Strings.format;
153+
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
153154
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
154155

155156
/**
@@ -376,6 +377,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
376377
*/
377378
private final int maxSnapshotCount;
378379

380+
private final ShardSnapshotTaskRunner shardSnapshotTaskRunner;
381+
379382
/**
380383
* Constructs new BlobStoreRepository
381384
* @param metadata The metadata for this repository including name and settings
@@ -405,6 +408,12 @@ protected BlobStoreRepository(
405408
this.basePath = basePath;
406409
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
407410
this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
411+
shardSnapshotTaskRunner = new ShardSnapshotTaskRunner(
412+
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
413+
threadPool.executor(ThreadPool.Names.SNAPSHOT),
414+
this::doSnapshotShard,
415+
this::snapshotFile
416+
);
408417
}
409418

410419
@Override
@@ -2629,6 +2638,10 @@ private void writeAtomic(
26292638

26302639
@Override
26312640
public void snapshotShard(SnapshotShardContext context) {
2641+
shardSnapshotTaskRunner.enqueueShardSnapshot(context);
2642+
}
2643+
2644+
private void doSnapshotShard(SnapshotShardContext context) {
26322645
if (isReadOnly()) {
26332646
context.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
26342647
return;
@@ -2889,45 +2902,19 @@ public void snapshotShard(SnapshotShardContext context) {
28892902
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
28902903
context.onResponse(shardSnapshotResult);
28912904
}, context::onFailure);
2892-
if (indexIncrementalFileCount == 0) {
2905+
if (indexIncrementalFileCount == 0 || filesToSnapshot.isEmpty()) {
28932906
allFilesUploadedListener.onResponse(Collections.emptyList());
28942907
return;
28952908
}
2896-
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
2897-
// Start as many workers as fit into the snapshot pool at once at the most
2898-
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
2899-
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener);
2900-
for (int i = 0; i < workers; ++i) {
2901-
executeOneFileSnapshot(store, snapshotId, context.indexId(), snapshotStatus, filesToSnapshot, executor, filesListener);
2909+
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, filesToSnapshot.size(), allFilesUploadedListener);
2910+
for (FileInfo fileInfo : filesToSnapshot) {
2911+
shardSnapshotTaskRunner.enqueueFileSnapshot(context, fileInfo, filesListener);
29022912
}
29032913
} catch (Exception e) {
29042914
context.onFailure(e);
29052915
}
29062916
}
29072917

2908-
private void executeOneFileSnapshot(
2909-
Store store,
2910-
SnapshotId snapshotId,
2911-
IndexId indexId,
2912-
IndexShardSnapshotStatus snapshotStatus,
2913-
BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot,
2914-
Executor executor,
2915-
ActionListener<Void> listener
2916-
) throws InterruptedException {
2917-
final ShardId shardId = store.shardId();
2918-
final BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
2919-
if (snapshotFileInfo == null) {
2920-
listener.onResponse(null);
2921-
} else {
2922-
executor.execute(ActionRunnable.wrap(listener, l -> {
2923-
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
2924-
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
2925-
executeOneFileSnapshot(store, snapshotId, indexId, snapshotStatus, filesToSnapshot, executor, l);
2926-
}
2927-
}));
2928-
}
2929-
}
2930-
29312918
private static Releasable incrementStoreRef(Store store, IndexShardSnapshotStatus snapshotStatus, ShardId shardId) {
29322919
if (store.tryIncRef() == false) {
29332920
if (snapshotStatus.isAborted()) {
@@ -3116,10 +3103,10 @@ void ensureNotClosing(final Store store) throws AlreadyClosedException {
31163103

31173104
private static ActionListener<Void> fileQueueListener(
31183105
BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files,
3119-
int workers,
3106+
int numberOfFiles,
31203107
ActionListener<Collection<Void>> listener
31213108
) {
3122-
return new GroupedActionListener<>(listener, workers).delegateResponse((l, e) -> {
3109+
return new GroupedActionListener<>(listener, numberOfFiles).delegateResponse((l, e) -> {
31233110
files.clear(); // Stop uploading the remaining files if we run into any exception
31243111
l.onFailure(e);
31253112
});
@@ -3426,19 +3413,20 @@ private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapsh
34263413

34273414
/**
34283415
* Snapshot individual file
3429-
* @param fileInfo file to be snapshotted
3416+
* @param fileInfo file to snapshot
34303417
*/
3431-
private void snapshotFile(
3432-
BlobStoreIndexShardSnapshot.FileInfo fileInfo,
3433-
IndexId indexId,
3434-
ShardId shardId,
3435-
SnapshotId snapshotId,
3436-
IndexShardSnapshotStatus snapshotStatus,
3437-
Store store
3438-
) throws IOException {
3418+
private void snapshotFile(SnapshotShardContext context, FileInfo fileInfo) throws IOException {
3419+
final IndexId indexId = context.indexId();
3420+
final Store store = context.store();
3421+
final ShardId shardId = store.shardId();
3422+
final IndexShardSnapshotStatus snapshotStatus = context.status();
3423+
final SnapshotId snapshotId = context.snapshotId();
34393424
final BlobContainer shardContainer = shardContainer(indexId, shardId);
34403425
final String file = fileInfo.physicalName();
3441-
try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) {
3426+
try (
3427+
Releasable ignored = BlobStoreRepository.incrementStoreRef(store, snapshotStatus, store.shardId());
3428+
IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())
3429+
) {
34423430
for (int i = 0; i < fileInfo.numberOfParts(); i++) {
34433431
final long partBytes = fileInfo.partBytes(i);
34443432

0 commit comments

Comments
 (0)