Skip to content

Commit 1a8d6ff

Browse files
authored
Prevent CCR recovery from missing documents (#38472)
Currently the snapshot/restore process manually sets the global checkpoint to the max sequence number from the restored segements. This does not work for Ccr as this will lead to documents that would be recovered in the normal followering operation from being recovered. This commit fixes this issue by setting the initial global checkpoint to the existing local checkpoint.
1 parent 8860291 commit 1a8d6ff

File tree

10 files changed

+405
-107
lines changed

10 files changed

+405
-107
lines changed

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -399,9 +399,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
399399
assert indexShouldExists;
400400
store.bootstrapNewHistory();
401401
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
402-
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
402+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
403403
final String translogUUID = Translog.createEmptyTranslog(
404-
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
404+
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
405405
store.associateIndexWithNewTranslog(translogUUID);
406406
} else if (indexShouldExists) {
407407
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
@@ -473,9 +473,9 @@ private void restore(final IndexShard indexShard, final Repository repository, f
473473
final Store store = indexShard.store();
474474
store.bootstrapNewHistory();
475475
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
476-
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
476+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
477477
final String translogUUID = Translog.createEmptyTranslog(
478-
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
478+
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
479479
store.associateIndexWithNewTranslog(translogUUID);
480480
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
481481
indexShard.openEngineAndRecoverFromTranslog();

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,30 +1426,36 @@ public void createEmpty() throws IOException {
14261426
public void bootstrapNewHistory() throws IOException {
14271427
metadataLock.writeLock().lock();
14281428
try {
1429-
Map<String, String> userData = readLastCommittedSegmentsInfo().getUserData();
1429+
final SegmentInfos segmentCommitInfos = readLastCommittedSegmentsInfo();
1430+
final Map<String, String> userData = segmentCommitInfos.getUserData();
1431+
final String rawLocalCheckpoint = userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY);
1432+
final String rawMaxSeqNo = userData.get(SequenceNumbers.MAX_SEQ_NO);
1433+
assert (rawLocalCheckpoint == null) == (rawMaxSeqNo == null) :
1434+
"local checkpoint was " + rawLocalCheckpoint + " but max seq no was " + rawMaxSeqNo;
1435+
1436+
assert rawLocalCheckpoint != null || segmentCommitInfos.getCommitLuceneVersion().major < 7 :
1437+
"Found Lucene version: " + segmentCommitInfos.getCommitLuceneVersion().major;
14301438
final SequenceNumbers.CommitInfo seqno = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
1431-
bootstrapNewHistory(seqno.maxSeqNo);
1439+
bootstrapNewHistory(seqno.localCheckpoint, seqno.maxSeqNo);
14321440
} finally {
14331441
metadataLock.writeLock().unlock();
14341442
}
14351443
}
14361444

14371445
/**
1438-
* Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint
1446+
* Marks an existing lucene index with a new history uuid and sets the given local checkpoint
14391447
* as well as the maximum sequence number.
1440-
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
1448+
* This is used to make sure no existing shard will recover from this index using ops based recovery.
14411449
* @see SequenceNumbers#LOCAL_CHECKPOINT_KEY
14421450
* @see SequenceNumbers#MAX_SEQ_NO
14431451
*/
1444-
public void bootstrapNewHistory(long maxSeqNo) throws IOException {
1452+
public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException {
14451453
metadataLock.writeLock().lock();
14461454
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
1447-
final Map<String, String> userData = getUserData(writer);
14481455
final Map<String, String> map = new HashMap<>();
14491456
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
1457+
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
14501458
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
1451-
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
1452-
logger.debug("bootstrap a new history_uuid [{}], user_data [{}]", map, userData);
14531459
updateCommitData(writer, map);
14541460
} finally {
14551461
metadataLock.writeLock().unlock();

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2158,9 +2158,12 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc
21582158

21592159
public void testRestoreShard() throws IOException {
21602160
final IndexShard source = newStartedShard(true);
2161-
IndexShard target = newStartedShard(true);
2161+
IndexShard target = newStartedShard(true, Settings.builder()
2162+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), source.indexSettings().isSoftDeleteEnabled()).build());
21622163

21632164
indexDoc(source, "_doc", "0");
2165+
EngineTestCase.generateNewSeqNo(source.getEngine()); // create a gap in the history
2166+
indexDoc(source, "_doc", "2");
21642167
if (randomBoolean()) {
21652168
source.refresh("test");
21662169
}
@@ -2196,16 +2199,18 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio
21962199
}
21972200
}
21982201
}));
2199-
assertThat(target.getLocalCheckpoint(), equalTo(0L));
2200-
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L));
2201-
assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L));
2202+
assertThat(target.getLocalCheckpoint(), equalTo(2L));
2203+
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
2204+
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
22022205
IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
22032206
assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard(
2204-
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L));
2207+
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L));
2208+
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L));
22052209

2206-
assertDocs(target, "0");
2210+
assertDocs(target, "0", "2");
22072211

2208-
closeShards(source, target);
2212+
closeShard(source, false);
2213+
closeShards(target);
22092214
}
22102215

22112216
public void testSearcherWrapperIsUsed() throws IOException {
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.index.shard;
20+
21+
import org.apache.lucene.index.IndexCommit;
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.cluster.metadata.IndexMetaData;
24+
import org.elasticsearch.cluster.metadata.MetaData;
25+
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
26+
import org.elasticsearch.cluster.node.DiscoveryNode;
27+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
28+
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
29+
import org.elasticsearch.index.store.Store;
30+
import org.elasticsearch.repositories.IndexId;
31+
import org.elasticsearch.repositories.Repository;
32+
import org.elasticsearch.repositories.RepositoryData;
33+
import org.elasticsearch.snapshots.SnapshotId;
34+
import org.elasticsearch.snapshots.SnapshotInfo;
35+
import org.elasticsearch.snapshots.SnapshotShardFailure;
36+
37+
import java.io.IOException;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Set;
43+
44+
import static java.util.Collections.emptySet;
45+
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
46+
47+
/** A dummy repository for testing which just needs restore overridden */
48+
public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
49+
private final String indexName;
50+
51+
public RestoreOnlyRepository(String indexName) {
52+
this.indexName = indexName;
53+
}
54+
55+
@Override
56+
protected void doStart() {
57+
}
58+
59+
@Override
60+
protected void doStop() {
61+
}
62+
63+
@Override
64+
protected void doClose() {
65+
}
66+
67+
@Override
68+
public RepositoryMetaData getMetadata() {
69+
return null;
70+
}
71+
72+
@Override
73+
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
74+
return null;
75+
}
76+
77+
@Override
78+
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
79+
return null;
80+
}
81+
82+
@Override
83+
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
84+
return null;
85+
}
86+
87+
@Override
88+
public RepositoryData getRepositoryData() {
89+
Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
90+
map.put(new IndexId(indexName, "blah"), emptySet());
91+
return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList());
92+
}
93+
94+
@Override
95+
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
96+
}
97+
98+
@Override
99+
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
100+
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
101+
boolean includeGlobalState) {
102+
return null;
103+
}
104+
105+
@Override
106+
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
107+
}
108+
109+
@Override
110+
public long getSnapshotThrottleTimeInNanos() {
111+
return 0;
112+
}
113+
114+
@Override
115+
public long getRestoreThrottleTimeInNanos() {
116+
return 0;
117+
}
118+
119+
@Override
120+
public String startVerification() {
121+
return null;
122+
}
123+
124+
@Override
125+
public void endVerification(String verificationToken) {
126+
}
127+
128+
@Override
129+
public boolean isReadOnly() {
130+
return false;
131+
}
132+
133+
@Override
134+
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
135+
IndexShardSnapshotStatus snapshotStatus) {
136+
}
137+
138+
@Override
139+
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
140+
return null;
141+
}
142+
143+
@Override
144+
public void verify(String verificationToken, DiscoveryNode localNode) {
145+
}
146+
}

test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class BackgroundIndexer implements AutoCloseable {
5252
private final Logger logger = LogManager.getLogger(getClass());
5353

5454
final Thread[] writers;
55+
final Client client;
5556
final CountDownLatch stopLatch;
5657
final CopyOnWriteArrayList<Exception> failures;
5758
final AtomicBoolean stop = new AtomicBoolean(false);
@@ -122,6 +123,7 @@ public BackgroundIndexer(final String index, final String type, final Client cli
122123
if (random == null) {
123124
random = RandomizedTest.getRandom();
124125
}
126+
this.client = client;
125127
useAutoGeneratedIDs = random.nextBoolean();
126128
failures = new CopyOnWriteArrayList<>();
127129
writers = new Thread[writerCount];
@@ -316,6 +318,10 @@ public void close() throws Exception {
316318
stop();
317319
}
318320

321+
public Client getClient() {
322+
return client;
323+
}
324+
319325
/**
320326
* Returns the ID set of all documents indexed by this indexer run
321327
*/

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -240,21 +240,17 @@ private void initiateFollowing(
240240
final PutFollowAction.Request request,
241241
final ActionListener<PutFollowAction.Response> listener) {
242242
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT.";
243-
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
244-
request.waitForActiveShards(), request.timeout(), result -> {
245-
if (result) {
246-
FollowParameters parameters = request.getParameters();
247-
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
248-
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
249-
resumeFollowRequest.setParameters(new FollowParameters(parameters));
250-
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
251-
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
252-
listener::onFailure
253-
));
254-
} else {
255-
listener.onResponse(new PutFollowAction.Response(true, false, false));
256-
}
257-
}, listener::onFailure);
243+
FollowParameters parameters = request.getParameters();
244+
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
245+
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
246+
resumeFollowRequest.setParameters(new FollowParameters(parameters));
247+
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
248+
r -> activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
249+
request.waitForActiveShards(), request.timeout(), result ->
250+
listener.onResponse(new PutFollowAction.Response(true, result, r.isAcknowledged())),
251+
listener::onFailure),
252+
listener::onFailure
253+
));
258254
}
259255

260256
@Override

0 commit comments

Comments
 (0)