-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Rework shard snapshot workers #88209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
031b2af
7f40dc5
0184d38
72f8624
4bfdfc7
b4fb408
691de91
8fea5bf
2213fa2
ea68db7
9dd7197
1b1e283
a44e47d
35e039a
21d4f92
6947ea1
b073662
05ed16c
cfb47c3
7710c89
4bbde07
0c3a7c1
fdb9cf3
d67b2f9
55bd708
b291617
00f14a4
bd3b8b5
f220c1d
2e28c9b
ff90c49
5901020
79acfd6
7ce5c41
5397f52
569913e
32a44f9
a33aaeb
12a408f
cd70f35
b7394e7
9e003df
8534cb2
1654166
54fbf89
e87b466
3649729
8770046
bbb65cd
36f22e6
f6db134
2a16d07
a5784cc
0d0d70c
fb4b217
3ba20a3
a683832
613bcfb
4f14c3e
a3bcbf0
ab7603f
3a4a28e
a4ef329
3c323f8
09d10df
865ceca
ba0a411
56619c0
79331f8
a752881
ed7ef03
fc8f8df
4b6d83f
ad896fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
pr: 88209 | ||
summary: Prioritize shard snapshot tasks over file snapshot tasks and limit the number of the concurrently running snapshot tasks | ||
area: Snapshot/Restore | ||
type: enhancement | ||
issues: | ||
- 83408 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.common.util.concurrent; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.PriorityBlockingQueue; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
/** | ||
* {@link PrioritizedThrottledTaskRunner} performs the enqueued tasks in the order dictated by the | ||
* natural ordering of the tasks, limiting the max number of concurrently running tasks. Each new task | ||
* that is dequeued to be run, is forked off to the given executor. | ||
*/ | ||
public class PrioritizedThrottledTaskRunner<T extends Comparable<T> & Runnable> { | ||
private static final Logger logger = LogManager.getLogger(PrioritizedThrottledTaskRunner.class); | ||
|
||
private final String taskRunnerName; | ||
// The max number of tasks that this runner will schedule to concurrently run on the executor. | ||
private final int maxRunningTasks; | ||
// As we fork off dequeued tasks to the given executor, technically the following counter represents | ||
// the number of the concurrent pollAndSpawn calls currently checking the queue for a task to run. This | ||
// doesn't necessarily correspond to currently running tasks, since a pollAndSpawn could return without | ||
// actually running a task when the queue is empty. | ||
private final AtomicInteger runningTasks = new AtomicInteger(); | ||
private final BlockingQueue<T> tasks = new PriorityBlockingQueue<>(); | ||
private final Executor executor; | ||
|
||
public PrioritizedThrottledTaskRunner(final String name, final int maxRunningTasks, final Executor executor) { | ||
assert maxRunningTasks > 0; | ||
this.taskRunnerName = name; | ||
this.maxRunningTasks = maxRunningTasks; | ||
this.executor = executor; | ||
} | ||
|
||
public void enqueueTask(final T task) { | ||
logger.trace("[{}] enqueuing task {}", taskRunnerName, task); | ||
tasks.add(task); | ||
// Try to run a task since now there is at least one in the queue. If the maxRunningTasks is | ||
// reached, the task is just enqueued. | ||
pollAndSpawn(); | ||
} | ||
|
||
// visible for testing | ||
protected void pollAndSpawn() { | ||
// A pollAndSpawn attempts to run a new task. There could be many concurrent pollAndSpawn calls competing | ||
// to get a "free slot", since we attempt to run a new task on every enqueueTask call and every time an | ||
// existing task is finished. | ||
while (incrementRunningTasks()) { | ||
T task = tasks.poll(); | ||
if (task == null) { | ||
logger.trace("[{}] task queue is empty", taskRunnerName); | ||
// We have taken up a "free slot", but there are no tasks in the queue! This could happen each time a worker | ||
// sees an empty queue after running a task. Decrement to give competing pollAndSpawn calls a chance! | ||
int decremented = runningTasks.decrementAndGet(); | ||
assert decremented >= 0; | ||
// We might have blocked all competing pollAndSpawn calls. This could happen for example when | ||
// maxRunningTasks=1 and a task got enqueued just after checking the queue but before decrementing. | ||
// To be sure, return only if the queue is still empty. If the queue is not empty, this might be the | ||
// only pollAndSpawn call in progress, and returning without peeking would risk ending up with a | ||
// non-empty queue and no workers! | ||
if (tasks.peek() == null) break; | ||
} else { | ||
executor.execute(() -> runTask(task)); | ||
} | ||
} | ||
} | ||
|
||
// Each worker thread that runs a task, first needs to get a "free slot" in order to respect maxRunningTasks. | ||
private boolean incrementRunningTasks() { | ||
int preUpdateValue = runningTasks.getAndUpdate(v -> v < maxRunningTasks ? v + 1 : v); | ||
assert preUpdateValue <= maxRunningTasks; | ||
return preUpdateValue < maxRunningTasks; | ||
} | ||
|
||
// Only use for testing | ||
public int runningTasks() { | ||
return runningTasks.get(); | ||
} | ||
|
||
// Only use for testing | ||
public int queueSize() { | ||
return tasks.size(); | ||
} | ||
|
||
private void runTask(final T task) { | ||
try { | ||
logger.trace("[{}] running task {}", taskRunnerName, task); | ||
task.run(); | ||
} finally { | ||
// To avoid missing to run tasks that are enqueued and waiting, we check the queue again once running | ||
// a task is finished. | ||
int decremented = runningTasks.decrementAndGet(); | ||
assert decremented >= 0; | ||
pollAndSpawn(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,6 +150,7 @@ | |
import java.util.stream.Stream; | ||
|
||
import static org.elasticsearch.core.Strings.format; | ||
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; | ||
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; | ||
|
||
/** | ||
|
@@ -376,6 +377,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp | |
*/ | ||
private final int maxSnapshotCount; | ||
|
||
private final ShardSnapshotTaskRunner shardSnapshotTaskRunner; | ||
|
||
/** | ||
* Constructs new BlobStoreRepository | ||
* @param metadata The metadata for this repository including name and settings | ||
|
@@ -405,6 +408,12 @@ protected BlobStoreRepository( | |
this.basePath = basePath; | ||
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings()); | ||
this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext()); | ||
shardSnapshotTaskRunner = new ShardSnapshotTaskRunner( | ||
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), | ||
threadPool.executor(ThreadPool.Names.SNAPSHOT), | ||
this::doSnapshotShard, | ||
this::snapshotFile | ||
); | ||
} | ||
|
||
@Override | ||
|
@@ -2629,6 +2638,10 @@ private void writeAtomic( | |
|
||
@Override | ||
public void snapshotShard(SnapshotShardContext context) { | ||
shardSnapshotTaskRunner.enqueueShardSnapshot(context); | ||
} | ||
|
||
private void doSnapshotShard(SnapshotShardContext context) { | ||
if (isReadOnly()) { | ||
context.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository")); | ||
return; | ||
|
@@ -2889,45 +2902,19 @@ public void snapshotShard(SnapshotShardContext context) { | |
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult); | ||
context.onResponse(shardSnapshotResult); | ||
}, context::onFailure); | ||
if (indexIncrementalFileCount == 0) { | ||
if (indexIncrementalFileCount == 0 || filesToSnapshot.isEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Turns out sometimes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, neat fix saving some redundant tasks :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, with the current PR this was necessary, as otherwise, we'd be creating a |
||
allFilesUploadedListener.onResponse(Collections.emptyList()); | ||
return; | ||
} | ||
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); | ||
// Start as many workers as fit into the snapshot pool at once at the most | ||
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount); | ||
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener); | ||
for (int i = 0; i < workers; ++i) { | ||
executeOneFileSnapshot(store, snapshotId, context.indexId(), snapshotStatus, filesToSnapshot, executor, filesListener); | ||
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, filesToSnapshot.size(), allFilesUploadedListener); | ||
for (FileInfo fileInfo : filesToSnapshot) { | ||
shardSnapshotTaskRunner.enqueueFileSnapshot(context, fileInfo, filesListener); | ||
} | ||
} catch (Exception e) { | ||
context.onFailure(e); | ||
} | ||
} | ||
|
||
private void executeOneFileSnapshot( | ||
Store store, | ||
SnapshotId snapshotId, | ||
IndexId indexId, | ||
IndexShardSnapshotStatus snapshotStatus, | ||
BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot, | ||
Executor executor, | ||
ActionListener<Void> listener | ||
) throws InterruptedException { | ||
final ShardId shardId = store.shardId(); | ||
final BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); | ||
if (snapshotFileInfo == null) { | ||
listener.onResponse(null); | ||
} else { | ||
executor.execute(ActionRunnable.wrap(listener, l -> { | ||
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) { | ||
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); | ||
executeOneFileSnapshot(store, snapshotId, indexId, snapshotStatus, filesToSnapshot, executor, l); | ||
} | ||
})); | ||
} | ||
} | ||
|
||
private static Releasable incrementStoreRef(Store store, IndexShardSnapshotStatus snapshotStatus, ShardId shardId) { | ||
if (store.tryIncRef() == false) { | ||
if (snapshotStatus.isAborted()) { | ||
|
@@ -3116,10 +3103,10 @@ void ensureNotClosing(final Store store) throws AlreadyClosedException { | |
|
||
private static ActionListener<Void> fileQueueListener( | ||
BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files, | ||
int workers, | ||
int numberOfFiles, | ||
ActionListener<Collection<Void>> listener | ||
) { | ||
return new GroupedActionListener<>(listener, workers).delegateResponse((l, e) -> { | ||
return new GroupedActionListener<>(listener, numberOfFiles).delegateResponse((l, e) -> { | ||
files.clear(); // Stop uploading the remaining files if we run into any exception | ||
l.onFailure(e); | ||
}); | ||
|
@@ -3426,19 +3413,20 @@ private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapsh | |
|
||
/** | ||
* Snapshot individual file | ||
* @param fileInfo file to be snapshotted | ||
* @param fileInfo file to snapshot | ||
*/ | ||
private void snapshotFile( | ||
BlobStoreIndexShardSnapshot.FileInfo fileInfo, | ||
IndexId indexId, | ||
ShardId shardId, | ||
SnapshotId snapshotId, | ||
IndexShardSnapshotStatus snapshotStatus, | ||
Store store | ||
) throws IOException { | ||
private void snapshotFile(SnapshotShardContext context, FileInfo fileInfo) throws IOException { | ||
final IndexId indexId = context.indexId(); | ||
final Store store = context.store(); | ||
final ShardId shardId = store.shardId(); | ||
final IndexShardSnapshotStatus snapshotStatus = context.status(); | ||
final SnapshotId snapshotId = context.snapshotId(); | ||
final BlobContainer shardContainer = shardContainer(indexId, shardId); | ||
final String file = fileInfo.physicalName(); | ||
try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { | ||
try ( | ||
Releasable ignored = BlobStoreRepository.incrementStoreRef(store, snapshotStatus, store.shardId()); | ||
IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata()) | ||
) { | ||
for (int i = 0; i < fileInfo.numberOfParts(); i++) { | ||
final long partBytes = fileInfo.partBytes(i); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some commentary on how this loop works. In particular, on why we need to peek the queue. This is somewhat hard to follow for me and will be even harder to follow for future readers of this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!