Skip to content

Commit 4fd9a9c

Browse files
Put a fake allocation id on allocate stale primary command (#34140)
removes fake allocation id after recovery is done Relates to #33432 (cherry picked from commit f789d49)
1 parent 6a781f6 commit 4fd9a9c

File tree

8 files changed

+352
-40
lines changed

8 files changed

+352
-40
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,19 @@ boolean validate(MetaData metaData) {
140140
}
141141

142142
if (shardRouting.primary() && shardRouting.initializing() &&
143-
shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE &&
144-
inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false)
145-
throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " +
146-
"a known allocation id but has no corresponding entry in the in-sync " +
147-
"allocation set " + inSyncAllocationIds);
148-
143+
shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE) {
144+
if (inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)) {
145+
if (inSyncAllocationIds.size() != 1) {
146+
throw new IllegalStateException("a primary shard routing " + shardRouting
147+
+ " is a primary that is recovering from a stale primary has unexpected allocation ids in in-sync " +
148+
"allocation set " + inSyncAllocationIds);
149+
}
150+
} else if (inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) {
151+
throw new IllegalStateException("a primary shard routing " + shardRouting
152+
+ " is a primary that is recovering from a known allocation id but has no corresponding entry in the in-sync " +
153+
"allocation set " + inSyncAllocationIds);
154+
}
155+
}
149156
}
150157
}
151158
return true;

server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ public String toString() {
132132
* Recovery from an existing on-disk store
133133
*/
134134
public static final class ExistingStoreRecoverySource extends RecoverySource {
135+
/**
136+
* Special allocation id that shard has during initialization on allocate_stale_primary
137+
*/
138+
public static final String FORCED_ALLOCATION_ID = "_forced_allocation_";
139+
135140
public static final ExistingStoreRecoverySource INSTANCE = new ExistingStoreRecoverySource(false);
136141
public static final ExistingStoreRecoverySource FORCE_STALE_PRIMARY_INSTANCE = new ExistingStoreRecoverySource(true);
137142

server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.HashSet;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.Objects;
4243
import java.util.Set;
4344
import java.util.stream.Collectors;
4445

@@ -68,7 +69,16 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali
6869

6970
@Override
7071
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
71-
addAllocationId(startedShard);
72+
assert Objects.equals(initializingShard.allocationId().getId(), startedShard.allocationId().getId())
73+
: "initializingShard.allocationId [" + initializingShard.allocationId().getId()
74+
+ "] and startedShard.allocationId [" + startedShard.allocationId().getId() + "] have to have the same";
75+
Updates updates = changes(startedShard.shardId());
76+
updates.addedAllocationIds.add(startedShard.allocationId().getId());
77+
if (startedShard.primary()
78+
// started shard has to have null recoverySource; have to pick up recoverySource from its initializing state
79+
&& (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)) {
80+
updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID);
81+
}
7282
}
7383

7484
@Override
@@ -144,7 +154,8 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab
144154
oldInSyncAllocationIds.contains(updates.initializedPrimary.allocationId().getId()) == false) {
145155
// we're not reusing an existing in-sync allocation id to initialize a primary, which means that we're either force-allocating
146156
// an empty or a stale primary (see AllocateEmptyPrimaryAllocationCommand or AllocateStalePrimaryAllocationCommand).
147-
RecoverySource.Type recoverySourceType = updates.initializedPrimary.recoverySource().getType();
157+
RecoverySource recoverySource = updates.initializedPrimary.recoverySource();
158+
RecoverySource.Type recoverySourceType = recoverySource.getType();
148159
boolean emptyPrimary = recoverySourceType == RecoverySource.Type.EMPTY_STORE;
149160
assert updates.addedAllocationIds.isEmpty() : (emptyPrimary ? "empty" : "stale") +
150161
" primary is not force-initialized in same allocation round where shards are started";
@@ -156,16 +167,26 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab
156167
// forcing an empty primary resets the in-sync allocations to the empty set (ShardRouting.allocatedPostIndexCreate)
157168
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.emptySet());
158169
} else {
170+
final String allocationId;
171+
if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) {
172+
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
173+
} else {
174+
assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource;
175+
allocationId = updates.initializedPrimary.allocationId().getId();
176+
}
159177
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id
160-
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(),
161-
Collections.singleton(updates.initializedPrimary.allocationId().getId()));
178+
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.singleton(allocationId));
162179
}
163180
} else {
164181
// standard path for updating in-sync ids
165182
Set<String> inSyncAllocationIds = new HashSet<>(oldInSyncAllocationIds);
166183
inSyncAllocationIds.addAll(updates.addedAllocationIds);
167184
inSyncAllocationIds.removeAll(updates.removedAllocationIds);
168185

186+
assert oldInSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false
187+
|| inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false :
188+
"fake allocation id has to be removed, inSyncAllocationIds:" + inSyncAllocationIds;
189+
169190
// Prevent set of inSyncAllocationIds to grow unboundedly. This can happen for example if we don't write to a primary
170191
// but repeatedly shut down nodes that have active replicas.
171192
// We use number_of_replicas + 1 (= possible active shard copies) to bound the inSyncAllocationIds set
@@ -287,13 +308,6 @@ void removeAllocationId(ShardRouting shardRouting) {
287308
}
288309
}
289310

290-
/**
291-
* Add allocation id of this shard to the set of in-sync shard copies
292-
*/
293-
private void addAllocationId(ShardRouting shardRouting) {
294-
changes(shardRouting.shardId()).addedAllocationIds.add(shardRouting.allocationId().getId());
295-
}
296-
297311
/**
298312
* Increase primary term for this shard id
299313
*/
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster.routing;
21+
22+
import org.apache.lucene.store.SimpleFSDirectory;
23+
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
24+
import org.elasticsearch.action.admin.indices.stats.ShardStats;
25+
import org.elasticsearch.action.index.IndexRequestBuilder;
26+
import org.elasticsearch.client.Requests;
27+
import org.elasticsearch.cluster.ClusterState;
28+
import org.elasticsearch.cluster.health.ClusterHealthStatus;
29+
import org.elasticsearch.cluster.metadata.IndexMetaData;
30+
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
31+
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
32+
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
33+
import org.elasticsearch.common.settings.Settings;
34+
import org.elasticsearch.index.IndexService;
35+
import org.elasticsearch.index.IndexSettings;
36+
import org.elasticsearch.index.MockEngineFactoryPlugin;
37+
import org.elasticsearch.index.engine.Engine;
38+
import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommandIT;
39+
import org.elasticsearch.index.shard.ShardId;
40+
import org.elasticsearch.index.shard.ShardPath;
41+
import org.elasticsearch.index.store.Store;
42+
import org.elasticsearch.indices.IndicesService;
43+
import org.elasticsearch.plugins.Plugin;
44+
import org.elasticsearch.test.DummyShardLock;
45+
import org.elasticsearch.test.ESIntegTestCase;
46+
import org.elasticsearch.test.InternalSettingsPlugin;
47+
import org.elasticsearch.test.InternalTestCluster;
48+
import org.elasticsearch.test.transport.MockTransportService;
49+
50+
import java.io.IOException;
51+
import java.nio.file.Path;
52+
import java.util.Arrays;
53+
import java.util.Collection;
54+
import java.util.Set;
55+
import java.util.concurrent.ExecutionException;
56+
import java.util.stream.Collectors;
57+
58+
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
59+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
60+
import static org.hamcrest.Matchers.equalTo;
61+
import static org.hamcrest.Matchers.greaterThan;
62+
import static org.hamcrest.Matchers.hasSize;
63+
import static org.hamcrest.Matchers.is;
64+
import static org.hamcrest.Matchers.not;
65+
66+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
67+
public class AllocationIdIT extends ESIntegTestCase {
68+
69+
@Override
70+
protected Collection<Class<? extends Plugin>> nodePlugins() {
71+
return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class);
72+
}
73+
74+
public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStalePrimary() throws Exception {
75+
/*
76+
* Allocation id is put on start of shard while historyUUID is adjusted after recovery is done.
77+
*
78+
* If during execution of AllocateStalePrimary a proper allocation id is stored in allocation id set and recovery is failed
79+
* shard restart skips the stage where historyUUID is changed.
80+
*
81+
* That leads to situation where allocated stale primary and its replica belongs to the same historyUUID and
82+
* replica will receive operations after local checkpoint while documents before checkpoints could be significant different.
83+
*
84+
* Therefore, on AllocateStalePrimary we put some fake allocation id (no real one could be generated like that)
85+
* and any failure during recovery requires extra AllocateStalePrimary command to be executed.
86+
*/
87+
88+
// initial set up
89+
final String indexName = "index42";
90+
final String master = internalCluster().startMasterOnlyNode();
91+
String node1 = internalCluster().startNode();
92+
createIndex(indexName, Settings.builder()
93+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
94+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
95+
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum").build());
96+
final int numDocs = indexDocs(indexName, "foo", "bar");
97+
final IndexSettings indexSettings = getIndexSettings(indexName, node1);
98+
final Set<String> allocationIds = getAllocationIds(indexName);
99+
final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
100+
final Path indexPath = getIndexPath(node1, shardId);
101+
assertThat(allocationIds, hasSize(1));
102+
final String historyUUID = historyUUID(node1, indexName);
103+
String node2 = internalCluster().startNode();
104+
ensureGreen(indexName);
105+
internalCluster().assertSameDocIdsOnShards();
106+
// initial set up is done
107+
108+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1));
109+
110+
// index more docs to node2 that marks node1 as stale
111+
int numExtraDocs = indexDocs(indexName, "foo", "bar2");
112+
assertHitCount(client(node2).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs);
113+
114+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2));
115+
116+
// create fake corrupted marker on node1
117+
putFakeCorruptionMarker(indexSettings, shardId, indexPath);
118+
119+
// thanks to master node1 is out of sync
120+
node1 = internalCluster().startNode();
121+
122+
// there is only _stale_ primary
123+
checkNoValidShardCopy(indexName, shardId);
124+
125+
// allocate stale primary
126+
client(node1).admin().cluster().prepareReroute()
127+
.add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true))
128+
.get();
129+
130+
// allocation fails due to corruption marker
131+
assertBusy(() -> {
132+
final ClusterState state = client().admin().cluster().prepareState().get().getState();
133+
final ShardRouting shardRouting = state.routingTable().index(indexName).shard(shardId.id()).primaryShard();
134+
assertThat(shardRouting.state(), equalTo(ShardRoutingState.UNASSIGNED));
135+
assertThat(shardRouting.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
136+
});
137+
138+
try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) {
139+
store.removeCorruptionMarker();
140+
}
141+
142+
// index is red: no any shard is allocated (allocation id is a fake id that does not match to anything)
143+
checkHealthStatus(indexName, ClusterHealthStatus.RED);
144+
checkNoValidShardCopy(indexName, shardId);
145+
146+
internalCluster().restartNode(node1, InternalTestCluster.EMPTY_CALLBACK);
147+
148+
// index is still red due to mismatch of allocation id
149+
checkHealthStatus(indexName, ClusterHealthStatus.RED);
150+
checkNoValidShardCopy(indexName, shardId);
151+
152+
// no any valid shard is there; have to invoke AllocateStalePrimary again
153+
client().admin().cluster().prepareReroute()
154+
.add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true))
155+
.get();
156+
157+
ensureYellow(indexName);
158+
159+
// bring node2 back
160+
node2 = internalCluster().startNode();
161+
ensureGreen(indexName);
162+
163+
assertThat(historyUUID(node1, indexName), not(equalTo(historyUUID)));
164+
assertThat(historyUUID(node1, indexName), equalTo(historyUUID(node2, indexName)));
165+
166+
internalCluster().assertSameDocIdsOnShards();
167+
}
168+
169+
public void checkHealthStatus(String indexName, ClusterHealthStatus healthStatus) {
170+
final ClusterHealthStatus indexHealthStatus = client().admin().cluster()
171+
.health(Requests.clusterHealthRequest(indexName)).actionGet().getStatus();
172+
assertThat(indexHealthStatus, is(healthStatus));
173+
}
174+
175+
private int indexDocs(String indexName, Object ... source) throws InterruptedException, ExecutionException {
176+
// index some docs in several segments
177+
int numDocs = 0;
178+
for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) {
179+
final int numExtraDocs = between(10, 100);
180+
IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs];
181+
for (int i = 0; i < builders.length; i++) {
182+
builders[i] = client().prepareIndex(indexName, "type").setSource(source);
183+
}
184+
185+
indexRandom(true, false, true, Arrays.asList(builders));
186+
numDocs += numExtraDocs;
187+
}
188+
189+
return numDocs;
190+
}
191+
192+
private Path getIndexPath(String nodeName, ShardId shardId) {
193+
final Set<Path> indexDirs = RemoveCorruptedShardDataCommandIT.getDirs(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME);
194+
assertThat(indexDirs, hasSize(1));
195+
return indexDirs.iterator().next();
196+
}
197+
198+
private Set<String> getAllocationIds(String indexName) {
199+
final ClusterState state = client().admin().cluster().prepareState().get().getState();
200+
final Set<String> allocationIds = state.metaData().index(indexName).inSyncAllocationIds(0);
201+
return allocationIds;
202+
}
203+
204+
private IndexSettings getIndexSettings(String indexName, String nodeName) {
205+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
206+
final IndexService indexService = indicesService.indexService(resolveIndex(indexName));
207+
return indexService.getIndexSettings();
208+
}
209+
210+
private String historyUUID(String node, String indexName) {
211+
final ShardStats[] shards = client(node).admin().indices().prepareStats(indexName).clear().get().getShards();
212+
assertThat(shards.length, greaterThan(0));
213+
final Set<String> historyUUIDs = Arrays.stream(shards)
214+
.map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY))
215+
.collect(Collectors.toSet());
216+
assertThat(historyUUIDs, hasSize(1));
217+
return historyUUIDs.iterator().next();
218+
}
219+
220+
private void putFakeCorruptionMarker(IndexSettings indexSettings, ShardId shardId, Path indexPath) throws IOException {
221+
try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) {
222+
store.markStoreCorrupted(new IOException("fake ioexception"));
223+
}
224+
}
225+
226+
private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception {
227+
final ClusterAllocationExplanation explanation =
228+
client().admin().cluster().prepareAllocationExplain()
229+
.setIndex(indexName).setShard(shardId.id()).setPrimary(true)
230+
.get().getExplanation();
231+
232+
final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision();
233+
assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true));
234+
assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(),
235+
equalTo(AllocationDecision.NO_VALID_SHARD_COPY));
236+
}
237+
238+
}

0 commit comments

Comments
 (0)