Skip to content

Commit 8c13cf7

Browse files
original-brownbearywelschtlrx
authored
Track Shard-Snapshot Index Generation at Repository Root (#46250)
### Changes to Root-Level index-N (RepositoryData) This change adds a new field `"shards"` to `RepositoryData` that contains a mapping of `IndexId` to a `String[]`. This string array can be accessed by shard id to get the generation of a shard's shard folder (i.e. the `N` in the name of the currently valid `/indices/${indexId}/${shardId}/index-${N}` for the shard in question). ### Benefits This allows for creating a new snapshot in the shard without doing any LIST operations on the shard's folder. In the case of AWS S3, this saves about 1/3 of the cost for updating an empty shard (see #45736) and removes one out of two remaining potential issues with eventually consistent blob stores (see #38941 ... now only the root `index-${N}` is determined by listing). Also and equally if not more important, a number of possible failure modes on eventually consistent blob stores like AWS S3 are eliminated by moving all delete operations to the `master` node and moving from incremental naming of shard level index-N to uuid suffixes for these blobs. ### Only Master Deletes Blobs This change moves the deleting of the previous shard level `index-${uuid}` blob to the master node instead of the data node allowing for a safe and consistent update of the shard's generation in the `RepositoryData` by first updating `RepositoryData` and then deleting the now unreferenced `index-${newUUID}` blob. __No deletes are executed on the data nodes at all for any operation with this change.__ Note also: Previous issues with hanging data nodes interfering with master nodes are completely impossible, even on S3 (see next section for details). ### Why Move from index-${N} to index-${uuid} at the Shard Level This change changes the naming of the shard level `index-${N}` blobs to a uuid suffix `index-${UUID}`. The reason for this is the fact that writing a new shard-level `index-` generation blob is not atomic anymore in its effect. Not only does the blob have to be written to have an effect, it must also be referenced by the root level `index-N` (`RepositoryData`) to become an effective part of the snapshot repository. This leads to a problem if we were to use incrementing names like we did before. If a blob `index-${N+1}` is written but due to the node/network/cluster/... crashes the root level `RepositoryData` has not been updated then a future operation will determine the shard's generation to be `N` and try to write a new `index-${N+1}` to the already existing path. Updates like that are problematic on S3 for consistency reasons, but also create numerous issues when thinking about stuck data nodes. Previously stuck data nodes that were tasked to write `index-${N+1}` but got stuck and tried to do so after some other node had already written `index-${N+1}` were prevented form doing so (except for on S3) by us not allowing overwrites for that blob and thus no corruption could occur. Were we to continue using incrementing names, we could not do this. The stuck node scenario would either allow for overwriting the `N+1` generation or force us to continue using a `LIST` operation to figure out the next `N` (which would make this change pointless). With uuid naming and moving all deletes to `master` this becomes a non-issue. Data nodes write updated shard generation `index-${uuid}` and `master` makes those `index-${uuid}` part of the `RepositoryData` that it deems correct and cleans up all those `index-` that are unused. Co-authored-by: Yannick Welsch <[email protected]> Co-authored-by: Tanguy Leroux <[email protected]>
1 parent 88b5450 commit 8c13cf7

File tree

29 files changed

+989
-340
lines changed

29 files changed

+989
-340
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.repositories.Repository;
4343
import org.elasticsearch.repositories.RepositoryCleanupResult;
4444
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
45+
import org.elasticsearch.snapshots.SnapshotsService;
4546
import org.elasticsearch.tasks.Task;
4647
import org.elasticsearch.threadpool.ThreadPool;
4748
import org.elasticsearch.transport.TransportService;
@@ -201,7 +202,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
201202
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
202203
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
203204
l -> blobStoreRepository.cleanup(
204-
repositoryStateId, ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
205+
repositoryStateId,
206+
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
207+
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
205208
}
206209

207210
private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

+33-6
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,14 @@ public static class Entry implements ToXContent {
9191
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
9292
private final long startTime;
9393
private final long repositoryStateId;
94+
// see #useShardGenerations
95+
private final boolean useShardGenerations;
9496
@Nullable private final Map<String, Object> userMetadata;
9597
@Nullable private final String failure;
9698

9799
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
98100
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
99-
String failure, Map<String, Object> userMetadata) {
101+
String failure, Map<String, Object> userMetadata, boolean useShardGenerations) {
100102
this.state = state;
101103
this.snapshot = snapshot;
102104
this.includeGlobalState = includeGlobalState;
@@ -114,6 +116,7 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
114116
this.repositoryStateId = repositoryStateId;
115117
this.failure = failure;
116118
this.userMetadata = userMetadata;
119+
this.useShardGenerations = useShardGenerations;
117120
}
118121

119122
private static boolean assertShardsConsistent(State state, List<IndexId> indices,
@@ -128,20 +131,22 @@ private static boolean assertShardsConsistent(State state, List<IndexId> indices
128131
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
129132
return true;
130133
}
134+
131135
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
132136
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
133-
Map<String, Object> userMetadata) {
134-
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata);
137+
Map<String, Object> userMetadata, boolean useShardGenerations) {
138+
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata,
139+
useShardGenerations);
135140
}
136141

137142
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
138143
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
139-
entry.repositoryStateId, shards, entry.failure, entry.userMetadata);
144+
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
140145
}
141146

142147
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
143148
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
144-
entry.repositoryStateId, shards, failure, entry.userMetadata);
149+
entry.repositoryStateId, shards, failure, entry.userMetadata, entry.useShardGenerations);
145150
}
146151

147152
public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
@@ -192,6 +197,16 @@ public String failure() {
192197
return failure;
193198
}
194199

200+
/**
201+
* Whether to write to the repository in a format only understood by versions newer than
202+
* {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
203+
*
204+
* @return true if writing to repository in new format
205+
*/
206+
public boolean useShardGenerations() {
207+
return useShardGenerations;
208+
}
209+
195210
@Override
196211
public boolean equals(Object o) {
197212
if (this == o) return true;
@@ -207,6 +222,7 @@ public boolean equals(Object o) {
207222
if (!snapshot.equals(entry.snapshot)) return false;
208223
if (state != entry.state) return false;
209224
if (repositoryStateId != entry.repositoryStateId) return false;
225+
if (useShardGenerations != entry.useShardGenerations) return false;
210226

211227
return true;
212228
}
@@ -221,6 +237,7 @@ public int hashCode() {
221237
result = 31 * result + indices.hashCode();
222238
result = 31 * result + Long.hashCode(startTime);
223239
result = 31 * result + Long.hashCode(repositoryStateId);
240+
result = 31 * result + (useShardGenerations ? 1 : 0);
224241
return result;
225242
}
226243

@@ -503,6 +520,12 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
503520
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
504521
userMetadata = in.readMap();
505522
}
523+
final boolean useShardGenerations;
524+
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
525+
useShardGenerations = in.readBoolean();
526+
} else {
527+
useShardGenerations = false;
528+
}
506529
entries[i] = new Entry(snapshot,
507530
includeGlobalState,
508531
partial,
@@ -512,7 +535,8 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
512535
repositoryStateId,
513536
builder.build(),
514537
failure,
515-
userMetadata
538+
userMetadata,
539+
useShardGenerations
516540
);
517541
}
518542
this.entries = Arrays.asList(entries);
@@ -541,6 +565,9 @@ public void writeTo(StreamOutput out) throws IOException {
541565
if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
542566
out.writeMap(entry.userMetadata);
543567
}
568+
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
569+
out.writeBoolean(entry.useShardGenerations);
570+
}
544571
}
545572
}
546573

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,17 @@ public RepositoryData getRepositoryData() {
7373
}
7474

7575
@Override
76-
public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
77-
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
78-
MetaData metaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener) {
79-
in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
80-
includeGlobalState, metaData, userMetadata, listener);
76+
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
77+
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
78+
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
79+
boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
80+
in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
81+
includeGlobalState, metaData, userMetadata, writeShardGens, listener);
8182
}
8283

8384
@Override
84-
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
85-
in.deleteSnapshot(snapshotId, repositoryStateId, listener);
85+
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
86+
in.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
8687
}
8788

8889
@Override
@@ -117,8 +118,9 @@ public boolean isReadOnly() {
117118

118119
@Override
119120
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
120-
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener) {
121-
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
121+
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
122+
ActionListener<String> listener) {
123+
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener);
122124
}
123125
@Override
124126
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {

server/src/main/java/org/elasticsearch/repositories/Repository.java

+19-14
Original file line numberDiff line numberDiff line change
@@ -112,28 +112,33 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
112112
* <p>
113113
* This method is called on master after all shards are snapshotted.
114114
*
115-
* @param snapshotId snapshot id
116-
* @param indices list of indices in the snapshot
117-
* @param startTime start time of the snapshot
118-
* @param failure global failure reason or null
119-
* @param totalShards total number of shards
120-
* @param shardFailures list of shard failures
121-
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
115+
* @param snapshotId snapshot id
116+
* @param shardGenerations updated shard generations
117+
* @param startTime start time of the snapshot
118+
* @param failure global failure reason or null
119+
* @param totalShards total number of shards
120+
* @param shardFailures list of shard failures
121+
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
122122
* @param includeGlobalState include cluster global state
123+
* @param clusterMetaData cluster metadata
124+
* @param userMetadata user metadata
125+
* @param writeShardGens if shard generations should be written to the repository
123126
* @param listener listener to be called on completion of the snapshot
124127
*/
125-
void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
126-
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
127-
MetaData clusterMetaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener);
128+
void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
129+
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
130+
boolean includeGlobalState, MetaData clusterMetaData, Map<String, Object> userMetadata,
131+
boolean writeShardGens, ActionListener<SnapshotInfo> listener);
128132

129133
/**
130134
* Deletes snapshot
131135
*
132-
* @param snapshotId snapshot id
136+
* @param snapshotId snapshot id
133137
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
134-
* @param listener completion listener
138+
* @param writeShardGens if shard generations should be written to the repository
139+
* @param listener completion listener
135140
*/
136-
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener);
141+
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener);
137142

138143
/**
139144
* Returns snapshot throttle time in nanoseconds
@@ -195,7 +200,7 @@ void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTi
195200
* @param listener listener invoked on completion
196201
*/
197202
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
198-
IndexShardSnapshotStatus snapshotStatus, ActionListener<String> listener);
203+
IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener<String> listener);
199204

200205
/**
201206
* Restores snapshot of the shard.

0 commit comments

Comments
 (0)