Skip to content

Commit c2a8fe1

Browse files
authored
Prevent CCR recovery from missing documents (#38237)
Currently the snapshot/restore process manually sets the global checkpoint to the max sequence number from the restored segements. This does not work for Ccr as this will lead to documents that would be recovered in the normal followering operation from being recovered. This commit fixes this issue by setting the initial global checkpoint to the existing local checkpoint.
1 parent aef5775 commit c2a8fe1

File tree

10 files changed

+397
-216
lines changed

10 files changed

+397
-216
lines changed

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
397397
assert indexShouldExists;
398398
store.bootstrapNewHistory();
399399
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
400-
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
400+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
401401
final String translogUUID = Translog.createEmptyTranslog(
402-
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
402+
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
403403
store.associateIndexWithNewTranslog(translogUUID);
404404
} else if (indexShouldExists) {
405405
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
@@ -466,9 +466,9 @@ private void restore(final IndexShard indexShard, final Repository repository, f
466466
final Store store = indexShard.store();
467467
store.bootstrapNewHistory();
468468
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
469-
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
469+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
470470
final String translogUUID = Translog.createEmptyTranslog(
471-
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
471+
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
472472
store.associateIndexWithNewTranslog(translogUUID);
473473
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
474474
indexShard.openEngineAndRecoverFromTranslog();

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1428,26 +1428,27 @@ public void bootstrapNewHistory() throws IOException {
14281428
try {
14291429
Map<String, String> userData = readLastCommittedSegmentsInfo().getUserData();
14301430
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
1431-
bootstrapNewHistory(maxSeqNo);
1431+
final long localCheckpoint = Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
1432+
bootstrapNewHistory(localCheckpoint, maxSeqNo);
14321433
} finally {
14331434
metadataLock.writeLock().unlock();
14341435
}
14351436
}
14361437

14371438
/**
1438-
* Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint
1439+
* Marks an existing lucene index with a new history uuid and sets the given local checkpoint
14391440
* as well as the maximum sequence number.
1440-
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
1441+
* This is used to make sure no existing shard will recover from this index using ops based recovery.
14411442
* @see SequenceNumbers#LOCAL_CHECKPOINT_KEY
14421443
* @see SequenceNumbers#MAX_SEQ_NO
14431444
*/
1444-
public void bootstrapNewHistory(long maxSeqNo) throws IOException {
1445+
public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException {
14451446
metadataLock.writeLock().lock();
14461447
try (IndexWriter writer = newAppendingIndexWriter(directory, null)) {
14471448
final Map<String, String> map = new HashMap<>();
14481449
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
1450+
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
14491451
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
1450-
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
14511452
updateCommitData(writer, map);
14521453
} finally {
14531454
metadataLock.writeLock().unlock();

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 12 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.logging.log4j.Logger;
2222
import org.apache.lucene.index.CorruptIndexException;
2323
import org.apache.lucene.index.DirectoryReader;
24-
import org.apache.lucene.index.IndexCommit;
2524
import org.apache.lucene.index.IndexableField;
2625
import org.apache.lucene.index.Term;
2726
import org.apache.lucene.search.IndexSearcher;
@@ -46,8 +45,6 @@
4645
import org.elasticsearch.action.support.PlainActionFuture;
4746
import org.elasticsearch.cluster.metadata.IndexMetaData;
4847
import org.elasticsearch.cluster.metadata.MappingMetaData;
49-
import org.elasticsearch.cluster.metadata.MetaData;
50-
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
5148
import org.elasticsearch.cluster.node.DiscoveryNode;
5249
import org.elasticsearch.cluster.routing.AllocationId;
5350
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@@ -62,7 +59,6 @@
6259
import org.elasticsearch.common.breaker.CircuitBreaker;
6360
import org.elasticsearch.common.bytes.BytesArray;
6461
import org.elasticsearch.common.collect.Tuple;
65-
import org.elasticsearch.common.component.AbstractLifecycleComponent;
6662
import org.elasticsearch.common.io.stream.BytesStreamOutput;
6763
import org.elasticsearch.common.io.stream.StreamInput;
6864
import org.elasticsearch.common.lease.Releasable;
@@ -107,7 +103,6 @@
107103
import org.elasticsearch.index.mapper.VersionFieldMapper;
108104
import org.elasticsearch.index.seqno.SeqNoStats;
109105
import org.elasticsearch.index.seqno.SequenceNumbers;
110-
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
111106
import org.elasticsearch.index.store.Store;
112107
import org.elasticsearch.index.store.StoreStats;
113108
import org.elasticsearch.index.store.StoreUtils;
@@ -121,12 +116,8 @@
121116
import org.elasticsearch.indices.recovery.RecoveryState;
122117
import org.elasticsearch.indices.recovery.RecoveryTarget;
123118
import org.elasticsearch.repositories.IndexId;
124-
import org.elasticsearch.repositories.Repository;
125-
import org.elasticsearch.repositories.RepositoryData;
126119
import org.elasticsearch.snapshots.Snapshot;
127120
import org.elasticsearch.snapshots.SnapshotId;
128-
import org.elasticsearch.snapshots.SnapshotInfo;
129-
import org.elasticsearch.snapshots.SnapshotShardFailure;
130121
import org.elasticsearch.test.CorruptionUtils;
131122
import org.elasticsearch.test.DummyShardLock;
132123
import org.elasticsearch.test.FieldMaskingReader;
@@ -143,7 +134,6 @@
143134
import java.util.ArrayList;
144135
import java.util.Arrays;
145136
import java.util.Collections;
146-
import java.util.HashMap;
147137
import java.util.HashSet;
148138
import java.util.Iterator;
149139
import java.util.List;
@@ -174,7 +164,6 @@
174164
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
175165
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
176166
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
177-
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
178167
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
179168
import static org.hamcrest.Matchers.containsInAnyOrder;
180169
import static org.hamcrest.Matchers.containsString;
@@ -2159,9 +2148,12 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc
21592148

21602149
public void testRestoreShard() throws IOException {
21612150
final IndexShard source = newStartedShard(true);
2162-
IndexShard target = newStartedShard(true);
2151+
IndexShard target = newStartedShard(true, Settings.builder()
2152+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), source.indexSettings().isSoftDeleteEnabled()).build());
21632153

21642154
indexDoc(source, "_doc", "0");
2155+
EngineTestCase.generateNewSeqNo(source.getEngine()); // create a gap in the history
2156+
indexDoc(source, "_doc", "2");
21652157
if (randomBoolean()) {
21662158
source.refresh("test");
21672159
}
@@ -2197,16 +2189,18 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio
21972189
}
21982190
}
21992191
}));
2200-
assertThat(target.getLocalCheckpoint(), equalTo(0L));
2201-
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L));
2202-
assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L));
2192+
assertThat(target.getLocalCheckpoint(), equalTo(2L));
2193+
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
2194+
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
22032195
IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
22042196
assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard(
2205-
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L));
2197+
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L));
2198+
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L));
22062199

2207-
assertDocs(target, "0");
2200+
assertDocs(target, "0", "2");
22082201

2209-
closeShards(source, target);
2202+
closeShard(source, false);
2203+
closeShards(target);
22102204
}
22112205

22122206
public void testSearcherWrapperIsUsed() throws IOException {
@@ -3131,107 +3125,6 @@ private Result indexOnReplicaWithGaps(
31313125
return new Result(localCheckpoint, max);
31323126
}
31333127

3134-
/** A dummy repository for testing which just needs restore overridden */
3135-
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
3136-
private final String indexName;
3137-
3138-
RestoreOnlyRepository(String indexName) {
3139-
this.indexName = indexName;
3140-
}
3141-
3142-
@Override
3143-
protected void doStart() {
3144-
}
3145-
3146-
@Override
3147-
protected void doStop() {
3148-
}
3149-
3150-
@Override
3151-
protected void doClose() {
3152-
}
3153-
3154-
@Override
3155-
public RepositoryMetaData getMetadata() {
3156-
return null;
3157-
}
3158-
3159-
@Override
3160-
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
3161-
return null;
3162-
}
3163-
3164-
@Override
3165-
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
3166-
return null;
3167-
}
3168-
3169-
@Override
3170-
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
3171-
return null;
3172-
}
3173-
3174-
@Override
3175-
public RepositoryData getRepositoryData() {
3176-
Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
3177-
map.put(new IndexId(indexName, "blah"), emptySet());
3178-
return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList());
3179-
}
3180-
3181-
@Override
3182-
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
3183-
}
3184-
3185-
@Override
3186-
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
3187-
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
3188-
boolean includeGlobalState) {
3189-
return null;
3190-
}
3191-
3192-
@Override
3193-
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
3194-
}
3195-
3196-
@Override
3197-
public long getSnapshotThrottleTimeInNanos() {
3198-
return 0;
3199-
}
3200-
3201-
@Override
3202-
public long getRestoreThrottleTimeInNanos() {
3203-
return 0;
3204-
}
3205-
3206-
@Override
3207-
public String startVerification() {
3208-
return null;
3209-
}
3210-
3211-
@Override
3212-
public void endVerification(String verificationToken) {
3213-
}
3214-
3215-
@Override
3216-
public boolean isReadOnly() {
3217-
return false;
3218-
}
3219-
3220-
@Override
3221-
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
3222-
IndexShardSnapshotStatus snapshotStatus) {
3223-
}
3224-
3225-
@Override
3226-
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
3227-
return null;
3228-
}
3229-
3230-
@Override
3231-
public void verify(String verificationToken, DiscoveryNode localNode) {
3232-
}
3233-
}
3234-
32353128
public void testIsSearchIdle() throws Exception {
32363129
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
32373130
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)

0 commit comments

Comments
 (0)