Skip to content

Commit 41633cb

Browse files
More Efficient Ordering of Shard Upload Execution (#42791) (#46588)
* More Efficient Ordering of Shard Upload Execution (#42791) * Change the upload order of of snapshots to work file by file in parallel on the snapshot pool instead of merely shard-by-shard * Inspired by #39657 * Cleanup BlobStoreRepository Abort and Failure Handling (#46208)
1 parent 80bb08f commit 41633cb

File tree

13 files changed

+304
-228
lines changed

13 files changed

+304
-228
lines changed

server/src/main/java/org/elasticsearch/action/ActionListener.java

+32
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.ExceptionsHelper;
2323
import org.elasticsearch.common.CheckedConsumer;
2424
import org.elasticsearch.common.CheckedFunction;
25+
import org.elasticsearch.common.CheckedRunnable;
2526
import org.elasticsearch.common.CheckedSupplier;
2627

2728
import java.util.ArrayList;
@@ -226,6 +227,37 @@ public void onFailure(Exception e) {
226227
};
227228
}
228229

230+
/**
231+
* Wraps a given listener and returns a new listener which executes the provided {@code runBefore}
232+
* callback before the listener is notified via either {@code #onResponse} or {@code #onFailure}.
233+
* If the callback throws an exception then it will be passed to the listener's {@code #onFailure} and its {@code #onResponse} will
234+
* not be executed.
235+
*/
236+
static <Response> ActionListener<Response> runBefore(ActionListener<Response> delegate, CheckedRunnable<?> runBefore) {
237+
return new ActionListener<Response>() {
238+
@Override
239+
public void onResponse(Response response) {
240+
try {
241+
runBefore.run();
242+
} catch (Exception ex) {
243+
delegate.onFailure(ex);
244+
return;
245+
}
246+
delegate.onResponse(response);
247+
}
248+
249+
@Override
250+
public void onFailure(Exception e) {
251+
try {
252+
runBefore.run();
253+
} catch (Exception ex) {
254+
e.addSuppressed(ex);
255+
}
256+
delegate.onFailure(e);
257+
}
258+
};
259+
}
260+
229261
/**
230262
* Wraps a given listener and returns a new listener which makes sure {@link #onResponse(Object)}
231263
* and {@link #onFailure(Exception)} of the provided listener will be called at most once.

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,11 @@ public boolean isReadOnly() {
121121
return in.isReadOnly();
122122
}
123123

124-
125124
@Override
126125
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
127-
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
128-
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
126+
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
127+
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
129128
}
130-
131129
@Override
132130
public void restoreShard(Store store, SnapshotId snapshotId,
133131
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {

server/src/main/java/org/elasticsearch/repositories/Repository.java

+3-23
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
* <ul>
5252
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
5353
* with list of indices that will be included into the snapshot</li>
54-
* <li>Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
54+
* <li>Data nodes call {@link Repository#snapshotShard}
5555
* for each shard</li>
5656
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
5757
* </ul>
@@ -191,27 +191,6 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
191191
*/
192192
boolean isReadOnly();
193193

194-
/**
195-
* Creates a snapshot of the shard based on the index commit point.
196-
* <p>
197-
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
198-
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
199-
* <p>
200-
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
201-
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
202-
* @param indexShard the shard to be snapshotted
203-
* @param snapshotId snapshot id
204-
* @param indexId id for the index being snapshotted
205-
* @param snapshotIndexCommit commit point
206-
* @param snapshotStatus snapshot status
207-
* @deprecated use {@link #snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} instead
208-
*/
209-
@Deprecated
210-
default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
211-
IndexShardSnapshotStatus snapshotStatus) {
212-
snapshotShard(indexShard.store(), indexShard.mapperService(), snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
213-
}
214-
215194
/**
216195
* Creates a snapshot of the shard based on the index commit point.
217196
* <p>
@@ -226,9 +205,10 @@ default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId
226205
* @param indexId id for the index being snapshotted
227206
* @param snapshotIndexCommit commit point
228207
* @param snapshotStatus snapshot status
208+
* @param listener listener invoked on completion
229209
*/
230210
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
231-
IndexShardSnapshotStatus snapshotStatus);
211+
IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener);
232212

233213
/**
234214
* Restores snapshot of the shard.

0 commit comments

Comments
 (0)