Skip to content

Commit d76161d

Browse files
committed
Write shard state metadata as soon as shard is created / initializing
As we rely on active allocation ids persisted in the cluster state to select the primary shard copy, we can write shard state metadata on the allocated node as soon as the node knows about receiving this shard. This also ensures that in case of primary relocation, when the relocation target is marked as started by the master node, the shard state metadata with the correct allocation id has already been written on the relocation target. Before this change, shard state metadata was only written once the node knows it is marked as started. In case of failures between master marking the node as started and the node receiving and processing this event, the relation between the shard copy on disk and the cluster state could get lost. This means that manual allocation of the shard using the reroute command allocate_stale_primary was necessary. Closes #16625
1 parent c5a2905 commit d76161d

File tree

9 files changed

+159
-162
lines changed

9 files changed

+159
-162
lines changed

core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,14 @@ protected NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRouting s
187187
}
188188

189189
if (nodeShardState.storeException() == null) {
190-
if (allocationId == null && nodeShardState.legacyVersion() != ShardStateMetaData.NO_VERSION) {
191-
// old shard with no allocation id, assign dummy value so that it gets added below in case of matchAnyShard
192-
allocationId = "_n/a_";
190+
if (allocationId == null && nodeShardState.legacyVersion() == ShardStateMetaData.NO_VERSION) {
191+
logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());
192+
} else if (allocationId != null) {
193+
assert nodeShardState.legacyVersion() == ShardStateMetaData.NO_VERSION : "Allocation id and legacy version cannot be both present";
194+
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), allocationId);
195+
} else {
196+
logger.trace("[{}] on node [{}] has no allocation id, out-dated shard (shard state version: [{}])", shard, nodeShardState.getNode(), nodeShardState.legacyVersion());
193197
}
194-
195-
logger.trace("[{}] on node [{}] has allocation id [{}] of shard", shard, nodeShardState.getNode(), allocationId);
196198
} else {
197199
logger.trace("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", nodeShardState.storeException(), shard, nodeShardState.getNode(), allocationId);
198200
allocationId = null;
@@ -299,9 +301,20 @@ NodeShardsResult buildVersionBasedNodeShardsResult(ShardRouting shard, boolean m
299301
continue;
300302
}
301303

302-
// no version means it does not exists, which is what the API returns, and what we expect to
303304
if (nodeShardState.storeException() == null) {
304-
logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version);
305+
if (version == ShardStateMetaData.NO_VERSION && nodeShardState.allocationId() == null) {
306+
logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());
307+
} else if (version != ShardStateMetaData.NO_VERSION) {
308+
assert nodeShardState.allocationId() == null : "Allocation id and legacy version cannot be both present";
309+
logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version);
310+
} else {
311+
// shard was already selected in a 5.x cluster as primary for recovery, was initialized (and wrote a new state file) but
312+
// did not make it to STARTED state before the cluster crashed (otherwise list of active allocation ids would be
313+
// non-empty and allocation id - based allocation mode would be chosen).
314+
// Prefer this shard copy again.
315+
version = Long.MAX_VALUE;
316+
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), nodeShardState.allocationId());
317+
}
305318
} else {
306319
// when there is an store exception, we disregard the reported version and assign it as no version (same as shard does not exist)
307320
logger.trace("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", nodeShardState.storeException(), shard, nodeShardState.getNode(), version);

core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ final class BitSetProducerWarmer implements IndexWarmer.Listener {
215215

216216
@Override
217217
public IndexWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
218-
if (indexSettings.getIndex().equals(indexShard.getIndexSettings().getIndex()) == false) {
218+
if (indexSettings.getIndex().equals(indexShard.indexSettings().getIndex()) == false) {
219219
// this is from a different index
220220
return TerminationHandle.NO_WAIT;
221221
}

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 64 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ public class IndexShard extends AbstractIndexShardComponent {
153153
private final EngineConfig engineConfig;
154154
private final TranslogConfig translogConfig;
155155
private final IndexEventListener indexEventListener;
156-
private final IndexSettings idxSettings;
157156

158157
/** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
159158
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
@@ -205,7 +204,6 @@ public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path,
205204
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, SearchSlowLog slowLog, Engine.Warmer warmer, IndexingOperationListener... listeners) {
206205
super(shardId, indexSettings);
207206
final Settings settings = indexSettings.getSettings();
208-
this.idxSettings = indexSettings;
209207
this.codecService = new CodecService(mapperService, logger);
210208
this.warmer = warmer;
211209
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
@@ -248,18 +246,14 @@ public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path,
248246
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
249247
this.suspendableRefContainer = new SuspendableRefContainer();
250248
this.searcherWrapper = indexSearcherWrapper;
251-
QueryShardContext queryShardContext = new QueryShardContext(idxSettings, indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry());
249+
QueryShardContext queryShardContext = new QueryShardContext(indexSettings, indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry());
252250
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryShardContext);
253251
}
254252

255253
public Store store() {
256254
return this.store;
257255
}
258256

259-
public IndexSettings getIndexSettings() {
260-
return idxSettings;
261-
}
262-
263257
/** returns true if this shard supports indexing (i.e., write) operations. */
264258
public boolean canIndex() {
265259
return true;
@@ -319,66 +313,64 @@ public QueryCachingPolicy getQueryCachingPolicy() {
319313
* unless explicitly disabled.
320314
*
321315
* @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
316+
* @throws IOException if shard state could not be persisted
322317
*/
323-
public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) {
318+
public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException {
324319
final ShardRouting currentRouting = this.shardRouting;
325320
if (!newRouting.shardId().equals(shardId())) {
326321
throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]");
327322
}
328323
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
329324
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
330325
}
331-
try {
332-
if (currentRouting != null) {
333-
if (!newRouting.primary() && currentRouting.primary()) {
334-
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
335-
}
336-
// if its the same routing, return
337-
if (currentRouting.equals(newRouting)) {
338-
return;
339-
}
326+
if (currentRouting != null) {
327+
if (!newRouting.primary() && currentRouting.primary()) {
328+
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
329+
}
330+
// if its the same routing, return
331+
if (currentRouting.equals(newRouting)) {
332+
return;
340333
}
334+
}
341335

342-
if (state == IndexShardState.POST_RECOVERY) {
343-
// if the state is started or relocating (cause it might move right away from started to relocating)
344-
// then move to STARTED
345-
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
346-
// we want to refresh *before* we move to internal STARTED state
347-
try {
348-
getEngine().refresh("cluster_state_started");
349-
} catch (Throwable t) {
350-
logger.debug("failed to refresh due to move to cluster wide started", t);
351-
}
336+
if (state == IndexShardState.POST_RECOVERY) {
337+
// if the state is started or relocating (cause it might move right away from started to relocating)
338+
// then move to STARTED
339+
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
340+
// we want to refresh *before* we move to internal STARTED state
341+
try {
342+
getEngine().refresh("cluster_state_started");
343+
} catch (Throwable t) {
344+
logger.debug("failed to refresh due to move to cluster wide started", t);
345+
}
352346

353-
boolean movedToStarted = false;
354-
synchronized (mutex) {
355-
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
356-
if (state == IndexShardState.POST_RECOVERY) {
357-
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
358-
movedToStarted = true;
359-
} else {
360-
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
361-
}
362-
}
363-
if (movedToStarted) {
364-
indexEventListener.afterIndexShardStarted(this);
347+
boolean movedToStarted = false;
348+
synchronized (mutex) {
349+
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
350+
if (state == IndexShardState.POST_RECOVERY) {
351+
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
352+
movedToStarted = true;
353+
} else {
354+
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
365355
}
366356
}
357+
if (movedToStarted) {
358+
indexEventListener.afterIndexShardStarted(this);
359+
}
367360
}
361+
}
368362

369-
if (state == IndexShardState.RELOCATED &&
370-
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
371-
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
372-
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
373-
// active primaries.
374-
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
375-
}
376-
this.shardRouting = newRouting;
377-
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
378-
} finally {
379-
if (persistState) {
380-
persistMetadata(newRouting, currentRouting);
381-
}
363+
if (state == IndexShardState.RELOCATED &&
364+
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
365+
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
366+
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
367+
// active primaries.
368+
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
369+
}
370+
this.shardRouting = newRouting;
371+
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
372+
if (persistState) {
373+
persistMetadata(newRouting, currentRouting);
382374
}
383375
}
384376

@@ -733,7 +725,7 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
733725
luceneVersion = segment.getVersion();
734726
}
735727
}
736-
return luceneVersion == null ? idxSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
728+
return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
737729
}
738730

739731
/**
@@ -1046,18 +1038,6 @@ public void checkIdle(long inactiveTimeNS) {
10461038
}
10471039
}
10481040

1049-
/**
1050-
* Deletes the shards metadata state. This method can only be executed if the shard is not active.
1051-
*
1052-
* @throws IOException if the delete fails
1053-
*/
1054-
public void deleteShardState() throws IOException {
1055-
if (this.routingEntry() != null && this.routingEntry().active()) {
1056-
throw new IllegalStateException("Can't delete shard state on an active shard");
1057-
}
1058-
MetaDataStateFormat.deleteMetaState(shardPath().getDataPath());
1059-
}
1060-
10611041
public boolean isActive() {
10621042
return active.get();
10631043
}
@@ -1070,7 +1050,7 @@ public boolean recoverFromStore(DiscoveryNode localNode) {
10701050
// we are the first primary, recover from the gateway
10711051
// if its post api allocation, the index should exists
10721052
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
1073-
boolean shouldExist = shardRouting.allocatedPostIndexCreate(idxSettings.getIndexMetaData());
1053+
boolean shouldExist = shardRouting.allocatedPostIndexCreate(indexSettings.getIndexMetaData());
10741054

10751055
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
10761056
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
@@ -1344,27 +1324,25 @@ public boolean allowsPrimaryPromotion() {
13441324
}
13451325

13461326
// pkg private for testing
1347-
void persistMetadata(ShardRouting newRouting, ShardRouting currentRouting) {
1327+
void persistMetadata(ShardRouting newRouting, @Nullable ShardRouting currentRouting) throws IOException {
13481328
assert newRouting != null : "newRouting must not be null";
1349-
if (newRouting.active()) {
1350-
try {
1351-
final String writeReason;
1352-
if (currentRouting == null) {
1353-
writeReason = "freshly started, allocation id [" + newRouting.allocationId() + "]";
1354-
} else if (currentRouting.equals(newRouting) == false) {
1355-
writeReason = "routing changed from " + currentRouting + " to " + newRouting;
1356-
} else {
1357-
logger.trace("{} skip writing shard state, has been written before", shardId);
1358-
return;
1359-
}
1360-
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId());
1361-
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
1362-
ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.legacyVersion, shardPath().getShardStatePath());
1363-
} catch (IOException e) { // this is how we used to handle it.... :(
1364-
logger.warn("failed to write shard state", e);
1365-
// we failed to write the shard state, we will try and write
1366-
// it next time...
1329+
1330+
// only persist metadata if routing information that is persisted in shard state metadata actually changed
1331+
if (currentRouting == null
1332+
|| currentRouting.primary() != newRouting.primary()
1333+
|| currentRouting.allocationId().equals(newRouting.allocationId()) == false) {
1334+
assert currentRouting == null || currentRouting.isSameAllocation(newRouting);
1335+
final String writeReason;
1336+
if (currentRouting == null) {
1337+
writeReason = "initial state with allocation id [" + newRouting.allocationId() + "]";
1338+
} else {
1339+
writeReason = "routing changed from " + currentRouting + " to " + newRouting;
13671340
}
1341+
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
1342+
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId());
1343+
ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.legacyVersion, shardPath().getShardStatePath());
1344+
} else {
1345+
logger.trace("{} skip writing shard state, has been written before", shardId);
13681346
}
13691347
}
13701348

@@ -1396,7 +1374,7 @@ public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throw
13961374
return new EngineConfig(shardId,
13971375
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
13981376
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
1399-
idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
1377+
indexSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
14001378
}
14011379

14021380
public Releasable acquirePrimaryOperationLock() {

core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath
5151

5252
/**
5353
* In addition to the regular accounting done in
54-
* {@link IndexShard#updateRoutingEntry(org.elasticsearch.cluster.routing.ShardRouting, boolean)},
54+
* {@link IndexShard#updateRoutingEntry(ShardRouting, boolean)},
5555
* if this shadow replica needs to be promoted to a primary, the shard is
5656
* failed in order to allow a new primary to be re-allocated.
5757
*/
5858
@Override
59-
public void updateRoutingEntry(ShardRouting newRouting, boolean persistState) {
59+
public void updateRoutingEntry(ShardRouting newRouting, boolean persistState) throws IOException {
6060
if (newRouting.primary() == true) {// becoming a primary
6161
throw new IllegalStateException("can't promote shard to primary");
6262
}

core/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,7 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
904904
if (!CACHEABLE_SEARCH_TYPES.contains(context.searchType())) {
905905
return false;
906906
}
907-
IndexSettings settings = context.indexShard().getIndexSettings();
907+
IndexSettings settings = context.indexShard().indexSettings();
908908
// if not explicitly set in the request, use the index setting, if not, use the request
909909
if (request.requestCache() == null) {
910910
if (settings.getValue(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING) == false) {

0 commit comments

Comments
 (0)