Skip to content

Commit 295d33c

Browse files
committed
Merge pull request #17021 from ywelsch/fix/block-delete-on-snapshot
Fail closing or deleting indices during a full snapshot
2 parents 22e7165 + 266394c commit 295d33c

File tree

8 files changed

+198
-49
lines changed

8 files changed

+198
-49
lines changed

core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,17 @@ public static class Entry {
6969
private final State state;
7070
private final SnapshotId snapshotId;
7171
private final boolean includeGlobalState;
72+
private final boolean partial;
7273
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
7374
private final List<String> indices;
7475
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
7576
private final long startTime;
7677

77-
public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, List<String> indices, long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
78+
public Entry(SnapshotId snapshotId, boolean includeGlobalState, boolean partial, State state, List<String> indices, long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
7879
this.state = state;
7980
this.snapshotId = snapshotId;
8081
this.includeGlobalState = includeGlobalState;
82+
this.partial = partial;
8183
this.indices = indices;
8284
this.startTime = startTime;
8385
if (shards == null) {
@@ -90,7 +92,7 @@ public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, Lis
9092
}
9193

9294
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
93-
this(entry.snapshotId, entry.includeGlobalState, state, entry.indices, entry.startTime, shards);
95+
this(entry.snapshotId, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, shards);
9496
}
9597

9698
public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
@@ -121,6 +123,10 @@ public boolean includeGlobalState() {
121123
return includeGlobalState;
122124
}
123125

126+
public boolean partial() {
127+
return partial;
128+
}
129+
124130
public long startTime() {
125131
return startTime;
126132
}
@@ -133,6 +139,7 @@ public boolean equals(Object o) {
133139
Entry entry = (Entry) o;
134140

135141
if (includeGlobalState != entry.includeGlobalState) return false;
142+
if (partial != entry.partial) return false;
136143
if (startTime != entry.startTime) return false;
137144
if (!indices.equals(entry.indices)) return false;
138145
if (!shards.equals(entry.shards)) return false;
@@ -148,6 +155,7 @@ public int hashCode() {
148155
int result = state.hashCode();
149156
result = 31 * result + snapshotId.hashCode();
150157
result = 31 * result + (includeGlobalState ? 1 : 0);
158+
result = 31 * result + (partial ? 1 : 0);
151159
result = 31 * result + shards.hashCode();
152160
result = 31 * result + indices.hashCode();
153161
result = 31 * result + waitingIndices.hashCode();
@@ -360,6 +368,7 @@ public SnapshotsInProgress readFrom(StreamInput in) throws IOException {
360368
for (int i = 0; i < entries.length; i++) {
361369
SnapshotId snapshotId = SnapshotId.readSnapshotId(in);
362370
boolean includeGlobalState = in.readBoolean();
371+
boolean partial = in.readBoolean();
363372
State state = State.fromValue(in.readByte());
364373
int indices = in.readVInt();
365374
List<String> indexBuilder = new ArrayList<>();
@@ -375,7 +384,7 @@ public SnapshotsInProgress readFrom(StreamInput in) throws IOException {
375384
State shardState = State.fromValue(in.readByte());
376385
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState));
377386
}
378-
entries[i] = new Entry(snapshotId, includeGlobalState, state, Collections.unmodifiableList(indexBuilder), startTime, builder.build());
387+
entries[i] = new Entry(snapshotId, includeGlobalState, partial, state, Collections.unmodifiableList(indexBuilder), startTime, builder.build());
379388
}
380389
return new SnapshotsInProgress(entries);
381390
}
@@ -386,6 +395,7 @@ public void writeTo(StreamOutput out) throws IOException {
386395
for (Entry entry : entries) {
387396
entry.snapshotId().writeTo(out);
388397
out.writeBoolean(entry.includeGlobalState());
398+
out.writeBoolean(entry.partial());
389399
out.writeByte(entry.state().value());
390400
out.writeVInt(entry.indices().size());
391401
for (String index : entry.indices()) {
@@ -406,6 +416,7 @@ static final class Fields {
406416
static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots");
407417
static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot");
408418
static final XContentBuilderString INCLUDE_GLOBAL_STATE = new XContentBuilderString("include_global_state");
419+
static final XContentBuilderString PARTIAL = new XContentBuilderString("partial");
409420
static final XContentBuilderString STATE = new XContentBuilderString("state");
410421
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
411422
static final XContentBuilderString START_TIME_MILLIS = new XContentBuilderString("start_time_millis");
@@ -431,6 +442,7 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p
431442
builder.field(Fields.REPOSITORY, entry.snapshotId().getRepository());
432443
builder.field(Fields.SNAPSHOT, entry.snapshotId().getSnapshot());
433444
builder.field(Fields.INCLUDE_GLOBAL_STATE, entry.includeGlobalState());
445+
builder.field(Fields.PARTIAL, entry.partial());
434446
builder.field(Fields.STATE, entry.state());
435447
builder.startArray(Fields.INDICES);
436448
{

core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@
3434
import org.elasticsearch.common.settings.Settings;
3535
import org.elasticsearch.common.unit.TimeValue;
3636
import org.elasticsearch.common.util.concurrent.FutureUtils;
37+
import org.elasticsearch.common.util.set.Sets;
3738
import org.elasticsearch.index.IndexNotFoundException;
39+
import org.elasticsearch.snapshots.SnapshotsService;
3840
import org.elasticsearch.threadpool.ThreadPool;
3941

40-
import java.util.Arrays;
41-
import java.util.Collection;
42+
import java.util.Set;
4243
import java.util.concurrent.ScheduledFuture;
4344
import java.util.concurrent.atomic.AtomicBoolean;
4445
import java.util.concurrent.atomic.AtomicInteger;
@@ -67,7 +68,7 @@ public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, Clus
6768
}
6869

6970
public void deleteIndices(final Request request, final Listener userListener) {
70-
Collection<String> indices = Arrays.asList(request.indices);
71+
Set<String> indices = Sets.newHashSet(request.indices);
7172
final DeleteIndexListener listener = new DeleteIndexListener(userListener);
7273

7374
clusterService.submitStateUpdateTask("delete-index " + indices, new ClusterStateUpdateTask(Priority.URGENT) {
@@ -84,6 +85,9 @@ public void onFailure(String source, Throwable t) {
8485

8586
@Override
8687
public ClusterState execute(final ClusterState currentState) {
88+
// Check if index deletion conflicts with any running snapshots
89+
SnapshotsService.checkIndexDeletion(currentState, indices);
90+
8791
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
8892
MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());
8993
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());

core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919

2020
package org.elasticsearch.cluster.metadata;
2121

22-
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2322
import org.elasticsearch.action.ActionListener;
2423
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
2524
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
2625
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
2726
import org.elasticsearch.cluster.ClusterService;
2827
import org.elasticsearch.cluster.ClusterState;
29-
import org.elasticsearch.cluster.RestoreInProgress;
3028
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
3129
import org.elasticsearch.cluster.block.ClusterBlock;
3230
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -39,8 +37,9 @@
3937
import org.elasticsearch.common.inject.Inject;
4038
import org.elasticsearch.common.settings.Settings;
4139
import org.elasticsearch.index.IndexNotFoundException;
42-
import org.elasticsearch.index.shard.ShardId;
4340
import org.elasticsearch.rest.RestStatus;
41+
import org.elasticsearch.snapshots.RestoreService;
42+
import org.elasticsearch.snapshots.SnapshotsService;
4443

4544
import java.util.ArrayList;
4645
import java.util.Arrays;
@@ -99,27 +98,10 @@ public ClusterState execute(ClusterState currentState) {
9998
return currentState;
10099
}
101100

102-
// Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index
103-
// is found as closing an index that is being restored makes the index unusable (it cannot be recovered).
104-
RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
105-
if (restore != null) {
106-
Set<String> indicesToFail = null;
107-
for (RestoreInProgress.Entry entry : restore.entries()) {
108-
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
109-
if (!shard.value.state().completed()) {
110-
if (indicesToClose.contains(shard.key.getIndexName())) {
111-
if (indicesToFail == null) {
112-
indicesToFail = new HashSet<>();
113-
}
114-
indicesToFail.add(shard.key.getIndexName());
115-
}
116-
}
117-
}
118-
}
119-
if (indicesToFail != null) {
120-
throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail);
121-
}
122-
}
101+
// Check if index closing conflicts with any running restores
102+
RestoreService.checkIndexClosing(currentState, indicesToClose);
103+
// Check if index closing conflicts with any running snapshots
104+
SnapshotsService.checkIndexClosing(currentState, indicesToClose);
123105

124106
logger.info("closing indices [{}]", indicesAsString);
125107

core/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,32 @@ private boolean failed(Snapshot snapshot, String index) {
774774
return false;
775775
}
776776

777+
/**
778+
* Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index
779+
* is found as closing an index that is being restored makes the index unusable (it cannot be recovered).
780+
*/
781+
public static void checkIndexClosing(ClusterState currentState, Set<String> indices) {
782+
RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
783+
if (restore != null) {
784+
Set<String> indicesToFail = null;
785+
for (RestoreInProgress.Entry entry : restore.entries()) {
786+
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
787+
if (!shard.value.state().completed()) {
788+
if (indices.contains(shard.key.getIndexName())) {
789+
if (indicesToFail == null) {
790+
indicesToFail = new HashSet<>();
791+
}
792+
indicesToFail.add(shard.key.getIndexName());
793+
}
794+
}
795+
}
796+
}
797+
if (indicesToFail != null) {
798+
throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail);
799+
}
800+
}
801+
}
802+
777803
/**
778804
* Adds restore completion listener
779805
* <p>

core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public ClusterState execute(ClusterState currentState) {
206206
// Store newSnapshot here to be processed in clusterStateProcessed
207207
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndices(currentState, request.indicesOptions(), request.indices()));
208208
logger.trace("[{}][{}] creating snapshot for indices [{}]", request.repository(), request.name(), indices);
209-
newSnapshot = new SnapshotsInProgress.Entry(snapshotId, request.includeGlobalState(), State.INIT, indices, System.currentTimeMillis(), null);
209+
newSnapshot = new SnapshotsInProgress.Entry(snapshotId, request.includeGlobalState(), request.partial(), State.INIT, indices, System.currentTimeMillis(), null);
210210
snapshots = new SnapshotsInProgress(newSnapshot);
211211
} else {
212212
// TODO: What should we do if a snapshot is already running?
@@ -228,7 +228,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
228228
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() {
229229
@Override
230230
public void run() {
231-
beginSnapshot(newState, newSnapshot, request.partial, listener);
231+
beginSnapshot(newState, newSnapshot, request.partial(), listener);
232232
}
233233
});
234234
}
@@ -1061,6 +1061,63 @@ private ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard
10611061
return builder.build();
10621062
}
10631063

1064+
/**
1065+
* Check if any of the indices to be deleted are currently being snapshotted. Fail as deleting an index that is being
1066+
* snapshotted (with partial == false) makes the snapshot fail.
1067+
*/
1068+
public static void checkIndexDeletion(ClusterState currentState, Set<String> indices) {
1069+
Set<String> indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices);
1070+
if (indicesToFail != null) {
1071+
throw new IllegalArgumentException("Cannot delete indices that are being snapshotted: " + indicesToFail +
1072+
". Try again after snapshot finishes or cancel the currently running snapshot.");
1073+
}
1074+
}
1075+
1076+
/**
1077+
* Check if any of the indices to be closed are currently being snapshotted. Fail as closing an index that is being
1078+
* snapshotted (with partial == false) makes the snapshot fail.
1079+
*/
1080+
public static void checkIndexClosing(ClusterState currentState, Set<String> indices) {
1081+
Set<String> indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices);
1082+
if (indicesToFail != null) {
1083+
throw new IllegalArgumentException("Cannot close indices that are being snapshotted: " + indicesToFail +
1084+
". Try again after snapshot finishes or cancel the currently running snapshot.");
1085+
}
1086+
}
1087+
1088+
private static Set<String> indicesToFailForCloseOrDeletion(ClusterState currentState, Set<String> indices) {
1089+
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
1090+
Set<String> indicesToFail = null;
1091+
if (snapshots != null) {
1092+
for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
1093+
if (entry.partial() == false) {
1094+
if (entry.state() == State.INIT) {
1095+
for (String index : entry.indices()) {
1096+
if (indices.contains(index)) {
1097+
if (indicesToFail == null) {
1098+
indicesToFail = new HashSet<>();
1099+
}
1100+
indicesToFail.add(index);
1101+
}
1102+
}
1103+
} else {
1104+
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
1105+
if (!shard.value.state().completed()) {
1106+
if (indices.contains(shard.key.getIndexName())) {
1107+
if (indicesToFail == null) {
1108+
indicesToFail = new HashSet<>();
1109+
}
1110+
indicesToFail.add(shard.key.getIndexName());
1111+
}
1112+
}
1113+
}
1114+
}
1115+
}
1116+
}
1117+
}
1118+
return indicesToFail;
1119+
}
1120+
10641121
/**
10651122
* Adds snapshot completion listener
10661123
*
@@ -1302,6 +1359,15 @@ public boolean includeGlobalState() {
13021359
return includeGlobalState;
13031360
}
13041361

1362+
/**
1363+
* Returns true if partial snapshot should be allowed
1364+
*
1365+
* @return true if partial snapshot should be allowed
1366+
*/
1367+
public boolean partial() {
1368+
return partial;
1369+
}
1370+
13051371
/**
13061372
* Returns master node timeout
13071373
*

core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,7 @@ public ClusterState.Custom randomCreate(String name) {
639639
return new SnapshotsInProgress(new SnapshotsInProgress.Entry(
640640
new SnapshotId(randomName("repo"), randomName("snap")),
641641
randomBoolean(),
642+
randomBoolean(),
642643
SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)),
643644
Collections.<String>emptyList(),
644645
Math.abs(randomLong()),

0 commit comments

Comments
 (0)