Skip to content

Prevent CCR recovery from missing documents #38472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 9, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
assert indexShouldExists;
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
Expand Down Expand Up @@ -473,9 +473,9 @@ private void restore(final IndexShard indexShard, final Repository repository, f
final Store store = indexShard.store();
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.openEngineAndRecoverFromTranslog();
Expand Down
23 changes: 15 additions & 8 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1426,30 +1426,37 @@ public void createEmpty() throws IOException {
public void bootstrapNewHistory() throws IOException {
metadataLock.writeLock().lock();
try {
Map<String, String> userData = readLastCommittedSegmentsInfo().getUserData();
final SegmentInfos segmentCommitInfos = readLastCommittedSegmentsInfo();
final Map<String, String> userData = segmentCommitInfos.getUserData();
final String rawLocalCheckpoint = userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY);
final String rawMaxSeqNo = userData.get(SequenceNumbers.MAX_SEQ_NO);
if (rawLocalCheckpoint == null) {
assert rawMaxSeqNo == null : "Local checkpoint null but max sequence number: " + rawMaxSeqNo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove the if block and instead write the following 2 assertions:

assert (rawLocalCheckpoint == null) == (rawMaxSeqNo == null) : 
    "local checkpoint was " + rawLocalCheckpoint + " but max seq no was " + rawMaxSeqNo;

assert rawLocalCheckpoint != null || segmentCommitInfos.getCommitLuceneVersion().major < 7 : 
    "Found Lucene version: " + segmentCommitInfos.getCommitLuceneVersion().major;

// If the local checkpoint is null we expect that this is Lucene version 6 or earlier
assert segmentCommitInfos.getCommitLuceneVersion().major < 7 : "Found Lucene version: " +
segmentCommitInfos.getCommitLuceneVersion().major;
}
final SequenceNumbers.CommitInfo seqno = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
bootstrapNewHistory(seqno.maxSeqNo);
bootstrapNewHistory(seqno.localCheckpoint, seqno.maxSeqNo);
} finally {
metadataLock.writeLock().unlock();
}
}

/**
* Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint
* Marks an existing lucene index with a new history uuid and sets the given local checkpoint
* as well as the maximum sequence number.
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
* This is used to make sure no existing shard will recover from this index using ops based recovery.
* @see SequenceNumbers#LOCAL_CHECKPOINT_KEY
* @see SequenceNumbers#MAX_SEQ_NO
*/
public void bootstrapNewHistory(long maxSeqNo) throws IOException {
public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
final Map<String, String> userData = getUserData(writer);
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
logger.debug("bootstrap a new history_uuid [{}], user_data [{}]", map, userData);
updateCommitData(writer, map);
} finally {
metadataLock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2158,9 +2158,12 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc

public void testRestoreShard() throws IOException {
final IndexShard source = newStartedShard(true);
IndexShard target = newStartedShard(true);
IndexShard target = newStartedShard(true, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), source.indexSettings().isSoftDeleteEnabled()).build());

indexDoc(source, "_doc", "0");
EngineTestCase.generateNewSeqNo(source.getEngine()); // create a gap in the history
indexDoc(source, "_doc", "2");
if (randomBoolean()) {
source.refresh("test");
}
Expand Down Expand Up @@ -2196,16 +2199,18 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio
}
}
}));
assertThat(target.getLocalCheckpoint(), equalTo(0L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L));
assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L));
assertThat(target.getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard(
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L));
target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L));

assertDocs(target, "0");
assertDocs(target, "0", "2");

closeShards(source, target);
closeShard(source, false);
closeShards(target);
}

public void testSearcherWrapperIsUsed() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;

import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.emptySet;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;

/** A dummy repository for testing which just needs restore overridden */
public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;

public RestoreOnlyRepository(String indexName) {
this.indexName = indexName;
}

@Override
protected void doStart() {
}

@Override
protected void doStop() {
}

@Override
protected void doClose() {
}

@Override
public RepositoryMetaData getMetadata() {
return null;
}

@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
return null;
}

@Override
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
return null;
}

@Override
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
return null;
}

@Override
public RepositoryData getRepositoryData() {
Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
map.put(new IndexId(indexName, "blah"), emptySet());
return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList());
}

@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
}

@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState) {
return null;
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
}

@Override
public long getSnapshotThrottleTimeInNanos() {
return 0;
}

@Override
public long getRestoreThrottleTimeInNanos() {
return 0;
}

@Override
public String startVerification() {
return null;
}

@Override
public void endVerification(String verificationToken) {
}

@Override
public boolean isReadOnly() {
return false;
}

@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
}

@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
return null;
}

@Override
public void verify(String verificationToken, DiscoveryNode localNode) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class BackgroundIndexer implements AutoCloseable {
private final Logger logger = LogManager.getLogger(getClass());

final Thread[] writers;
final Client client;
final CountDownLatch stopLatch;
final CopyOnWriteArrayList<Exception> failures;
final AtomicBoolean stop = new AtomicBoolean(false);
Expand Down Expand Up @@ -122,6 +123,7 @@ public BackgroundIndexer(final String index, final String type, final Client cli
if (random == null) {
random = RandomizedTest.getRandom();
}
this.client = client;
useAutoGeneratedIDs = random.nextBoolean();
failures = new CopyOnWriteArrayList<>();
writers = new Thread[writerCount];
Expand Down Expand Up @@ -316,6 +318,10 @@ public void close() throws Exception {
stop();
}

public Client getClient() {
return client;
}

/**
* Returns the ID set of all documents indexed by this indexer run
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,21 +240,17 @@ private void initiateFollowing(
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT.";
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
request.waitForActiveShards(), request.timeout(), result -> {
if (result) {
FollowParameters parameters = request.getParameters();
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
resumeFollowRequest.setParameters(new FollowParameters(parameters));
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
listener::onFailure
));
} else {
listener.onResponse(new PutFollowAction.Response(true, false, false));
}
}, listener::onFailure);
FollowParameters parameters = request.getParameters();
ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request();
resumeFollowRequest.setFollowerIndex(request.getFollowerIndex());
resumeFollowRequest.setParameters(new FollowParameters(parameters));
client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap(
r -> activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()},
request.waitForActiveShards(), request.timeout(), result ->
listener.onResponse(new PutFollowAction.Response(true, result, r.isAcknowledged())),
listener::onFailure),
listener::onFailure
));
}

@Override
Expand Down
Loading