Skip to content

Commit e24c7ab

Browse files
committed
Track Shard-Snapshot Index Generation at Repository Root
Based on elastic/elasticsearch#46250
1 parent 2ff16ce commit e24c7ab

File tree

15 files changed

+1652
-558
lines changed

15 files changed

+1652
-558
lines changed

server/src/main/java/org/elasticsearch/action/ActionListener.java

+23
Original file line numberDiff line numberDiff line change
@@ -247,4 +247,27 @@ public void onFailure(Exception e) {
247247
}
248248
};
249249
}
250+
251+
/**
252+
* Creates a listener that delegates all responses it receives to another listener.
253+
*
254+
* @param delegate ActionListener to wrap and delegate any exception to
255+
* @param bc BiConsumer invoked with delegate listener and exception
256+
* @param <T> Type of the listener
257+
* @return Delegating listener
258+
*/
259+
static <T> ActionListener<T> delegateResponse(ActionListener<T> delegate, BiConsumer<ActionListener<T>, Exception> bc) {
260+
return new ActionListener<T>() {
261+
262+
@Override
263+
public void onResponse(T r) {
264+
delegate.onResponse(r);
265+
}
266+
267+
@Override
268+
public void onFailure(Exception e) {
269+
bc.accept(delegate, e);
270+
}
271+
};
272+
}
250273
}

server/src/main/java/org/elasticsearch/action/ActionRunnable.java

+28
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.elasticsearch.action;
2121

22+
import io.crate.common.CheckedSupplier;
2223
import org.elasticsearch.common.CheckedConsumer;
24+
import org.elasticsearch.common.CheckedRunnable;
2325
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2426

2527
/**
@@ -30,6 +32,32 @@ public abstract class ActionRunnable<Response> extends AbstractRunnable {
3032

3133
protected final ActionListener<Response> listener;
3234

35+
/**
36+
* Creates a {@link Runnable} that invokes the given listener with {@code null} after the given runnable has executed.
37+
* @param listener Listener to invoke
38+
* @param runnable Runnable to execute
39+
* @return Wrapped {@code Runnable}
40+
*/
41+
public static <T> ActionRunnable<T> run(ActionListener<T> listener, CheckedRunnable<Exception> runnable) {
42+
return new ActionRunnable<>(listener) {
43+
@Override
44+
protected void doRun() throws Exception {
45+
runnable.run();
46+
listener.onResponse(null);
47+
}
48+
};
49+
}
50+
51+
/**
52+
* Creates a {@link Runnable} that invokes the given listener with the return of the given supplier.
53+
* @param listener Listener to invoke
54+
* @param supplier Supplier that provides value to pass to listener
55+
* @return Wrapped {@code Runnable}
56+
*/
57+
public static <T> ActionRunnable<T> supply(ActionListener<T> listener, CheckedSupplier<T, Exception> supplier) {
58+
return ActionRunnable.wrap(listener, l -> l.onResponse(supplier.get()));
59+
}
60+
3361
/**
3462
* Creates a {@link Runnable} that wraps the given listener and a consumer of it that is executed when the {@link Runnable} is run.
3563
* Invokes {@link ActionListener#onFailure(Exception)} on it if an exception is thrown on executing the consumer.

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

+38-13
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,12 @@ public static class Entry {
9393
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
9494
private final long startTime;
9595
private final long repositoryStateId;
96+
private final boolean useShardGenerations;
9697
@Nullable private final String failure;
9798

9899
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
99100
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
100-
String failure) {
101+
String failure, boolean useShardGenerations) {
101102
this.state = state;
102103
this.snapshot = snapshot;
103104
this.includeGlobalState = includeGlobalState;
@@ -114,21 +115,22 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
114115
}
115116
this.repositoryStateId = repositoryStateId;
116117
this.failure = failure;
118+
this.useShardGenerations = useShardGenerations;
117119
}
118120

119121
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
120-
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
121-
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null);
122+
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, boolean useShardGenerations) {
123+
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, useShardGenerations);
122124
}
123125

124126
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
125127
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
126-
entry.repositoryStateId, shards, entry.failure);
128+
entry.repositoryStateId, shards, entry.failure, entry.useShardGenerations);
127129
}
128130

129131
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
130132
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
131-
entry.repositoryStateId, shards, failure);
133+
entry.repositoryStateId, shards, failure, entry.useShardGenerations);
132134
}
133135

134136
public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
@@ -188,6 +190,16 @@ public String failure() {
188190
return failure;
189191
}
190192

193+
/**
194+
* Whether to write to the repository in a format only understood by versions newer than
195+
* {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
196+
*
197+
* @return true if writing to repository in new format
198+
*/
199+
public boolean useShardGenerations() {
200+
return useShardGenerations;
201+
}
202+
191203
@Override
192204
public boolean equals(Object o) {
193205
if (this == o) return true;
@@ -203,6 +215,7 @@ public boolean equals(Object o) {
203215
if (!snapshot.equals(entry.snapshot)) return false;
204216
if (state != entry.state) return false;
205217
if (repositoryStateId != entry.repositoryStateId) return false;
218+
if (useShardGenerations != entry.useShardGenerations) return false;
206219

207220
return true;
208221
}
@@ -217,6 +230,8 @@ public int hashCode() {
217230
result = 31 * result + indices.hashCode();
218231
result = 31 * result + Long.hashCode(startTime);
219232
result = 31 * result + Long.hashCode(repositoryStateId);
233+
result = 31 * result + (useShardGenerations ? 1 : 0);
234+
220235
return result;
221236
}
222237

@@ -445,24 +460,30 @@ public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException
445460
public SnapshotsInProgress(StreamInput in) throws IOException {
446461
Entry[] entries = new Entry[in.readVInt()];
447462
for (int i = 0; i < entries.length; i++) {
448-
Snapshot snapshot = new Snapshot(in);
449-
boolean includeGlobalState = in.readBoolean();
450-
boolean partial = in.readBoolean();
451-
State state = State.fromValue(in.readByte());
463+
final Snapshot snapshot = new Snapshot(in);
464+
final boolean includeGlobalState = in.readBoolean();
465+
final boolean partial = in.readBoolean();
466+
final State state = State.fromValue(in.readByte());
452467
int indices = in.readVInt();
453468
List<IndexId> indexBuilder = new ArrayList<>();
454469
for (int j = 0; j < indices; j++) {
455470
indexBuilder.add(new IndexId(in.readString(), in.readString()));
456471
}
457-
long startTime = in.readLong();
472+
final long startTime = in.readLong();
458473
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
459-
int shards = in.readVInt();
474+
final int shards = in.readVInt();
460475
for (int j = 0; j < shards; j++) {
461476
ShardId shardId = new ShardId(in);
462477
builder.put(shardId, new ShardSnapshotStatus(in));
463478
}
464-
long repositoryStateId = in.readLong();
479+
final long repositoryStateId = in.readLong();
465480
final String failure = in.readOptionalString();
481+
final boolean useShardGenerations;
482+
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
483+
useShardGenerations = in.readBoolean();
484+
} else {
485+
useShardGenerations = false;
486+
}
466487
entries[i] = new Entry(snapshot,
467488
includeGlobalState,
468489
partial,
@@ -471,7 +492,8 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
471492
startTime,
472493
repositoryStateId,
473494
builder.build(),
474-
failure);
495+
failure,
496+
useShardGenerations);
475497
}
476498
this.entries = Arrays.asList(entries);
477499
}
@@ -496,6 +518,9 @@ public void writeTo(StreamOutput out) throws IOException {
496518
}
497519
out.writeLong(entry.repositoryStateId);
498520
out.writeOptionalString(entry.failure);
521+
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
522+
out.writeBoolean(entry.useShardGenerations);
523+
}
499524
}
500525
}
501526

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.util.set.Sets;
3939
import org.elasticsearch.index.Index;
4040
import org.elasticsearch.snapshots.RestoreService;
41+
import org.elasticsearch.snapshots.SnapshotInProgressException;
4142
import org.elasticsearch.snapshots.SnapshotsService;
4243

4344
import java.util.Arrays;
@@ -93,9 +94,15 @@ public ClusterState execute(final ClusterState currentState) {
9394
*/
9495
public ClusterState deleteIndices(ClusterState currentState, Set<Index> indices) {
9596
final MetaData meta = currentState.metaData();
96-
final Set<IndexMetaData> metaDatas = indices.stream().map(i -> meta.getIndexSafe(i)).collect(toSet());
97+
final Set<Index> indicesToDelete = indices.stream().map(i -> meta.getIndexSafe(i).getIndex()).collect(toSet());
98+
9799
// Check if index deletion conflicts with any running snapshots
98-
SnapshotsService.checkIndexDeletion(currentState, metaDatas);
100+
Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToDelete);
101+
if (snapshottingIndices.isEmpty() == false) {
102+
throw new SnapshotInProgressException("Cannot delete indices that are being snapshotted: " + snapshottingIndices +
103+
". Try again after snapshot finishes or cancel the currently running snapshot.");
104+
}
105+
99106
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
100107
MetaData.Builder metaDataBuilder = MetaData.builder(meta);
101108
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());

server/src/main/java/org/elasticsearch/repositories/Repository.java

+36-23
Original file line numberDiff line numberDiff line change
@@ -133,36 +133,39 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
133133
* <p>
134134
* This method is called on master after all shards are snapshotted.
135135
*
136-
* @param snapshotId snapshot id
137-
* @param indices list of indices in the snapshot
138-
* @param startTime start time of the snapshot
139-
* @param failure global failure reason or null
140-
* @param totalShards total number of shards
141-
* @param shardFailures list of shard failures
142-
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
143-
* @param includeGlobalState include cluster global state
144-
* @param clusterMetaData cluster metadata
145-
* @param listener listener to be called on completion of the snapshot
136+
* @param snapshotId snapshot id
137+
* @param shardGenerations updated shard generations
138+
* @param startTime start time of the snapshot
139+
* @param failure global failure reason or null
140+
* @param totalShards total number of shards
141+
* @param shardFailures list of shard failures
142+
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
143+
* @param includeGlobalState include cluster global state
144+
* @param clusterMetaData cluster metadata
145+
* @param writeShardGens if shard generations should be written to the repository
146+
* @param listener listener to be called on completion of the snapshot
146147
*/
147148
void finalizeSnapshot(SnapshotId snapshotId,
148-
List<IndexId> indices,
149-
long startTime,
150-
String failure,
151-
int totalShards,
152-
List<SnapshotShardFailure> shardFailures,
153-
long repositoryStateId,
154-
boolean includeGlobalState,
155-
MetaData clusterMetaData,
156-
ActionListener<SnapshotInfo> listener);
149+
ShardGenerations shardGenerations,
150+
long startTime,
151+
String failure,
152+
int totalShards,
153+
List<SnapshotShardFailure> shardFailures,
154+
long repositoryStateId,
155+
boolean includeGlobalState,
156+
MetaData clusterMetaData,
157+
boolean writeShardGens,
158+
ActionListener<SnapshotInfo> listener);
157159

158160
/**
159161
* Deletes snapshot
160162
*
161-
* @param snapshotId snapshot id
163+
* @param snapshotId snapshot id
162164
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
163-
* @param listener completion listener
165+
* @param writeShardGens if shard generations should be written to the repository
166+
* @param listener completion listener
164167
*/
165-
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener);
168+
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener);
166169

167170
/**
168171
* Verifies repository on the master node and returns the verification token.
@@ -213,7 +216,7 @@ void finalizeSnapshot(SnapshotId snapshotId,
213216
* @param listener listener invoked on completion
214217
*/
215218
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
216-
IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener);
219+
IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener<String> listener);
217220

218221
/**
219222
* Restores snapshot of the shard.
@@ -229,4 +232,14 @@ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshot
229232
void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
230233
RecoveryState recoveryState);
231234

235+
/**
236+
* Retrieve shard snapshot status for the stored snapshot
237+
*
238+
* @param snapshotId snapshot id
239+
* @param indexId the snapshotted index id for the shard to get status for
240+
* @param shardId shard id
241+
* @return snapshot status
242+
*/
243+
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId);
244+
232245
}

0 commit comments

Comments
 (0)