Skip to content
This repository was archived by the owner on Sep 24, 2019. It is now read-only.

Commit 62ce29b

Browse files
author
Vladimir Dolzhenko
committed
Put a fake allocation id on allocate stale primary command; remove it after recovery is done
Relates to elastic#33432
1 parent cb4cdf1 commit 62ce29b

File tree

4 files changed

+27
-5
lines changed

4 files changed

+27
-5
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ boolean validate(MetaData metaData) {
141141

142142
if (shardRouting.primary() && shardRouting.initializing() &&
143143
shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE &&
144-
inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false)
144+
inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false &&
145+
inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false)
145146
throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " +
146147
"a known allocation id but has no corresponding entry in the in-sync " +
147148
"allocation set " + inSyncAllocationIds);

server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public String toString() {
132132
* Recovery from an existing on-disk store
133133
*/
134134
public static final class ExistingStoreRecoverySource extends RecoverySource {
135+
public static final String FORCED_ALLOCATION_ID = "_forced_allocation";
135136
public static final ExistingStoreRecoverySource INSTANCE = new ExistingStoreRecoverySource(false);
136137
public static final ExistingStoreRecoverySource FORCE_STALE_PRIMARY_INSTANCE = new ExistingStoreRecoverySource(true);
137138

server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali
6969
@Override
7070
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
7171
addAllocationId(startedShard);
72+
if (startedShard.primary()
73+
// started shard has to have null recoverySource; have to pick up recoverySource from its initializing state
74+
&& (initializingShard.recoverySource() instanceof RecoverySource.ExistingStoreRecoverySource
75+
|| initializingShard.recoverySource() instanceof RecoverySource.SnapshotRecoverySource)) {
76+
Updates updates = changes(startedShard.shardId());
77+
updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID);
78+
}
7279
}
7380

7481
@Override
@@ -144,7 +151,8 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab
144151
oldInSyncAllocationIds.contains(updates.initializedPrimary.allocationId().getId()) == false) {
145152
// we're not reusing an existing in-sync allocation id to initialize a primary, which means that we're either force-allocating
146153
// an empty or a stale primary (see AllocateEmptyPrimaryAllocationCommand or AllocateStalePrimaryAllocationCommand).
147-
RecoverySource.Type recoverySourceType = updates.initializedPrimary.recoverySource().getType();
154+
RecoverySource recoverySource = updates.initializedPrimary.recoverySource();
155+
RecoverySource.Type recoverySourceType = recoverySource.getType();
148156
boolean emptyPrimary = recoverySourceType == RecoverySource.Type.EMPTY_STORE;
149157
assert updates.addedAllocationIds.isEmpty() : (emptyPrimary ? "empty" : "stale") +
150158
" primary is not force-initialized in same allocation round where shards are started";
@@ -156,9 +164,12 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab
156164
// forcing an empty primary resets the in-sync allocations to the empty set (ShardRouting.allocatedPostIndexCreate)
157165
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.emptySet());
158166
} else {
167+
assert recoverySource instanceof RecoverySource.ExistingStoreRecoverySource
168+
|| recoverySource instanceof RecoverySource.SnapshotRecoverySource
169+
: recoverySource;
159170
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id
160171
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(),
161-
Collections.singleton(updates.initializedPrimary.allocationId().getId()));
172+
Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID));
162173
}
163174
} else {
164175
// standard path for updating in-sync ids
@@ -291,7 +302,8 @@ void removeAllocationId(ShardRouting shardRouting) {
291302
* Add allocation id of this shard to the set of in-sync shard copies
292303
*/
293304
private void addAllocationId(ShardRouting shardRouting) {
294-
changes(shardRouting.shardId()).addedAllocationIds.add(shardRouting.allocationId().getId());
305+
final Updates changes = changes(shardRouting.shardId());
306+
changes.addedAllocationIds.add(shardRouting.allocationId().getId());
295307
}
296308

297309
/**

server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,13 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
204204
}
205205
rerouteBuilder.get();
206206

207+
ClusterState state = client().admin().cluster().prepareState().get().getState();
208+
209+
Set<String> expectedAllocationIds = useStaleReplica
210+
? Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)
211+
: Collections.emptySet();
212+
assertEquals(expectedAllocationIds, state.metaData().index(idxName).inSyncAllocationIds(0));
213+
207214
logger.info("--> check that the stale primary shard gets allocated and that documents are available");
208215
ensureYellow(idxName);
209216

@@ -218,7 +225,8 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
218225
assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L);
219226

220227
// allocation id of old primary was cleaned from the in-sync set
221-
ClusterState state = client().admin().cluster().prepareState().get().getState();
228+
state = client().admin().cluster().prepareState().get().getState();
229+
222230
assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()),
223231
state.metaData().index(idxName).inSyncAllocationIds(0));
224232

0 commit comments

Comments
 (0)