Skip to content

Commit 6275cd7

Browse files
authored
Do not load global checkpoint to ReplicationTracker in local recovery step (#44781)
If we force allocate an empty or stale primary, the global checkpoint on replicas might be higher than the primary's as the local recovery step (introduced in #43463) loads the previous (stale) global checkpoint into ReplicationTracker. There's no issue with the retention leases for a new lease with a higher term will supersede the stale one. Relates #43463
1 parent 06d9be6 commit 6275cd7

File tree

3 files changed

+74
-24
lines changed

3 files changed

+74
-24
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+21-15
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@
171171
import java.util.function.BiConsumer;
172172
import java.util.function.Consumer;
173173
import java.util.function.Function;
174+
import java.util.function.LongSupplier;
174175
import java.util.function.Supplier;
175176
import java.util.stream.Collectors;
176177
import java.util.stream.StreamSupport;
@@ -1407,7 +1408,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
14071408
recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count
14081409
return recoveredOps;
14091410
};
1410-
innerOpenEngineAndTranslog();
1411+
innerOpenEngineAndTranslog(() -> globalCheckpoint);
14111412
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
14121413
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
14131414
} finally {
@@ -1533,6 +1534,15 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
15331534
return opsRecovered;
15341535
}
15351536

1537+
private void loadGlobalCheckpointToReplicationTracker() throws IOException {
1538+
// we have to set it before we open an engine and recover from the translog because
1539+
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
1540+
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
1541+
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
1542+
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
1543+
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
1544+
}
1545+
15361546
/**
15371547
* opens the engine on top of the existing lucene engine and translog.
15381548
* Operations from the translog will be replayed to bring lucene up to date.
@@ -1548,7 +1558,8 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
15481558
return runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
15491559
translogRecoveryStats::incrementRecoveredOperations);
15501560
};
1551-
innerOpenEngineAndTranslog();
1561+
loadGlobalCheckpointToReplicationTracker();
1562+
innerOpenEngineAndTranslog(replicationTracker);
15521563
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
15531564
}
15541565

@@ -1559,25 +1570,20 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
15591570
public void openEngineAndSkipTranslogRecovery() throws IOException {
15601571
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
15611572
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]";
1562-
innerOpenEngineAndTranslog();
1573+
loadGlobalCheckpointToReplicationTracker();
1574+
innerOpenEngineAndTranslog(replicationTracker);
15631575
getEngine().skipTranslogRecovery();
15641576
}
15651577

1566-
private void innerOpenEngineAndTranslog() throws IOException {
1578+
private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
15671579
if (state != IndexShardState.RECOVERING) {
15681580
throw new IndexShardNotRecoveringException(shardId, state);
15691581
}
1570-
final EngineConfig config = newEngineConfig();
1582+
final EngineConfig config = newEngineConfig(globalCheckpointSupplier);
15711583

15721584
// we disable deletes since we allow for operations to be executed against the shard while recovering
15731585
// but we need to make sure we don't loose deletes until we are done recovering
15741586
config.setEnableGcDeletes(false);
1575-
// we have to set it before we open an engine and recover from the translog because
1576-
// acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
1577-
// and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
1578-
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
1579-
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
1580-
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
15811587
updateRetentionLeasesOnReplica(loadRetentionLeases());
15821588
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
15831589
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
@@ -2646,7 +2652,7 @@ private DocumentMapperForType docMapper(String type) {
26462652
mapperService.resolveDocumentType(type));
26472653
}
26482654

2649-
private EngineConfig newEngineConfig() {
2655+
private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
26502656
Sort indexSort = indexSortSupplier.get();
26512657
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
26522658
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
@@ -2656,7 +2662,7 @@ private EngineConfig newEngineConfig() {
26562662
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
26572663
Collections.singletonList(refreshListeners),
26582664
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
2659-
indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases,
2665+
indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases,
26602666
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
26612667
}
26622668

@@ -3293,7 +3299,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
32933299
// we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata,
32943300
// acquireXXXCommit and close works.
32953301
final Engine readOnlyEngine =
3296-
new ReadOnlyEngine(newEngineConfig(), seqNoStats, translogStats, false, Function.identity()) {
3302+
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) {
32973303
@Override
32983304
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
32993305
synchronized (mutex) {
@@ -3322,7 +3328,7 @@ public void close() throws IOException {
33223328
}
33233329
};
33243330
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
3325-
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig()));
3331+
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
33263332
onNewEngine(newEngineReference.get());
33273333
}
33283334
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(

server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

+38-5
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.cluster.routing.RecoverySource;
4343
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
4444
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
45+
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
4546
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
4647
import org.elasticsearch.cluster.service.ClusterService;
4748
import org.elasticsearch.common.settings.Settings;
@@ -51,6 +52,7 @@
5152
import org.elasticsearch.common.xcontent.XContentType;
5253
import org.elasticsearch.index.Index;
5354
import org.elasticsearch.index.IndexService;
55+
import org.elasticsearch.index.MockEngineFactoryPlugin;
5456
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
5557
import org.elasticsearch.index.analysis.TokenFilterFactory;
5658
import org.elasticsearch.index.engine.Engine;
@@ -76,6 +78,7 @@
7678
import org.elasticsearch.test.ESIntegTestCase.Scope;
7779
import org.elasticsearch.test.InternalSettingsPlugin;
7880
import org.elasticsearch.test.InternalTestCluster;
81+
import org.elasticsearch.test.engine.MockEngineSupport;
7982
import org.elasticsearch.test.store.MockFSIndexStore;
8083
import org.elasticsearch.test.transport.MockTransportService;
8184
import org.elasticsearch.test.transport.StubbableTransport;
@@ -84,7 +87,6 @@
8487
import org.elasticsearch.transport.TransportRequest;
8588
import org.elasticsearch.transport.TransportRequestOptions;
8689
import org.elasticsearch.transport.TransportService;
87-
import org.junit.After;
8890

8991
import java.io.IOException;
9092
import java.util.ArrayList;
@@ -137,12 +139,16 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
137139
MockFSIndexStore.TestPlugin.class,
138140
RecoverySettingsChunkSizePlugin.class,
139141
TestAnalysisPlugin.class,
140-
InternalSettingsPlugin.class);
142+
InternalSettingsPlugin.class,
143+
MockEngineFactoryPlugin.class);
141144
}
142145

143-
@After
144-
public void assertConsistentHistoryInLuceneIndex() throws Exception {
146+
@Override
147+
protected void beforeIndexDeletion() throws Exception {
148+
super.beforeIndexDeletion();
145149
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
150+
internalCluster().assertSeqNos();
151+
internalCluster().assertSameDocIdsOnShards();
146152
}
147153

148154
private void assertRecoveryStateWithoutStage(RecoveryState state, int shardId, RecoverySource recoverySource, boolean primary,
@@ -1049,7 +1055,8 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
10491055
for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries().get().shardRecoveryStates().get(indexName)) {
10501056
if (startRecoveryRequest.targetNode().equals(recoveryState.getTargetNode())) {
10511057
assertThat("total recovered translog operations must include both local and remote recovery",
1052-
recoveryState.getTranslog().recoveredOperations(), equalTo(Math.toIntExact(maxSeqNo - localCheckpointOfSafeCommit)));
1058+
recoveryState.getTranslog().recoveredOperations(),
1059+
greaterThanOrEqualTo(Math.toIntExact(maxSeqNo - localCheckpointOfSafeCommit)));
10531060
}
10541061
}
10551062
for (String node : nodes) {
@@ -1116,4 +1123,30 @@ public void testRepeatedRecovery() throws Exception {
11161123
ensureGreen(indexName);
11171124
}
11181125

1126+
public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception {
1127+
internalCluster().startMasterOnlyNode(Settings.EMPTY);
1128+
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
1129+
final Settings randomNodeDataPathSettings = internalCluster().dataPathSettings(randomFrom(dataNodes));
1130+
final String indexName = "test";
1131+
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
1132+
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
1133+
.put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), randomBoolean())).get());
1134+
final List<IndexRequestBuilder> indexRequests = IntStream.range(0, between(10, 500))
1135+
.mapToObj(n -> client().prepareIndex(indexName, "type").setSource("foo", "bar"))
1136+
.collect(Collectors.toList());
1137+
indexRandom(randomBoolean(), true, true, indexRequests);
1138+
ensureGreen();
1139+
internalCluster().stopRandomDataNode();
1140+
internalCluster().stopRandomDataNode();
1141+
final String nodeWithoutData = internalCluster().startDataOnlyNode();
1142+
assertAcked(client().admin().cluster().prepareReroute()
1143+
.add(new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeWithoutData, true)).get());
1144+
internalCluster().startDataOnlyNode(randomNodeDataPathSettings);
1145+
ensureGreen();
1146+
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()) {
1147+
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
1148+
assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
1149+
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
1150+
}
1151+
}
11191152
}

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -161,22 +161,31 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
161161
assertThat(shard.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
162162
assertThat(shard.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
163163
assertThat(shard.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
164+
assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
164165
closeShards(shard);
165166

166167
// good copy
167168
shard = newStartedShard(false);
168169
long globalCheckpoint = populateData.apply(shard);
169170
Optional<SequenceNumbers.CommitInfo> safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint);
170171
assertTrue(safeCommit.isPresent());
172+
int expectedTotalLocal = 0;
173+
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshotFromMinSeqNo(safeCommit.get().localCheckpoint + 1)) {
174+
Translog.Operation op;
175+
while ((op = snapshot.next()) != null) {
176+
if (op.seqNo() <= globalCheckpoint) {
177+
expectedTotalLocal++;
178+
}
179+
}
180+
}
171181
IndexShard replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
172182
RecoverySource.PeerRecoverySource.INSTANCE));
173183
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
174184
replica.prepareForIndexRecovery();
175185
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(globalCheckpoint + 1));
176-
assertThat(replica.recoveryState().getTranslog().totalLocal(),
177-
equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint)));
178-
assertThat(replica.recoveryState().getTranslog().recoveredOperations(),
179-
equalTo(Math.toIntExact(globalCheckpoint - safeCommit.get().localCheckpoint)));
186+
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(expectedTotalLocal));
187+
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectedTotalLocal));
188+
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
180189
closeShards(replica);
181190

182191
// corrupted copy
@@ -192,6 +201,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
192201
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
193202
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
194203
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
204+
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
195205
closeShards(replica);
196206

197207
// copy with truncated translog
@@ -213,6 +223,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception {
213223
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
214224
}
215225
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
226+
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
216227
closeShards(replica);
217228
}
218229
}

0 commit comments

Comments
 (0)