From 6c123b393e96a3e4e61796e5a259534c72713f9c Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 5 Feb 2019 13:32:41 -0600 Subject: [PATCH 1/6] Prevent CCR recovery from missing documents (#38237) Currently the snapshot/restore process manually sets the global checkpoint to the max sequence number from the restored segements. This does not work for Ccr as this will lead to documents that would be recovered in the normal followering operation from being recovered. This commit fixes this issue by setting the initial global checkpoint to the existing local checkpoint. --- .../index/shard/StoreRecovery.java | 8 +- .../org/elasticsearch/index/store/Store.java | 15 +- .../index/shard/IndexShardTests.java | 19 ++- .../index/shard/RestoreOnlyRepository.java | 146 ++++++++++++++++++ .../elasticsearch/test/BackgroundIndexer.java | 6 + .../ccr/action/TransportPutFollowAction.java | 26 ++-- .../elasticsearch/xpack/CcrIntegTestCase.java | 68 ++++++++ .../xpack/ccr/IndexFollowingIT.java | 136 ++++++++-------- .../engine/FollowEngineIndexShardTests.java | 78 ++++++++++ .../SourceOnlySnapshotRepository.java | 3 +- 10 files changed, 398 insertions(+), 107 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index c57fc08166b47..32795b3b4197b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -399,9 +399,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe assert indexShouldExists; store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); - final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm()); + indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); } else if (indexShouldExists) { if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { @@ -473,9 +473,9 @@ private void restore(final IndexShard indexShard, final Repository repository, f final Store store = indexShard.store(); store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); - final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm()); + indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.openEngineAndRecoverFromTranslog(); diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index e1457486f9f0c..14e5514ba1cb2 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1427,29 +1427,28 @@ public void bootstrapNewHistory() throws IOException { metadataLock.writeLock().lock(); try { Map userData = readLastCommittedSegmentsInfo().getUserData(); - final SequenceNumbers.CommitInfo seqno = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet()); - bootstrapNewHistory(seqno.maxSeqNo); + final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); + final long localCheckpoint = Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + bootstrapNewHistory(localCheckpoint, maxSeqNo); } finally { metadataLock.writeLock().unlock(); } } /** - * Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint + * Marks an existing lucene index with a new history uuid and sets the given local checkpoint * as well as the maximum sequence number. - * This is used to make sure no existing shard will recovery from this index using ops based recovery. + * This is used to make sure no existing shard will recover from this index using ops based recovery. * @see SequenceNumbers#LOCAL_CHECKPOINT_KEY * @see SequenceNumbers#MAX_SEQ_NO */ - public void bootstrapNewHistory(long maxSeqNo) throws IOException { + public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException { metadataLock.writeLock().lock(); try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) { - final Map userData = getUserData(writer); final Map map = new HashMap<>(); map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); + map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); - map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); - logger.debug("bootstrap a new history_uuid [{}], user_data [{}]", map, userData); updateCommitData(writer, map); } finally { metadataLock.writeLock().unlock(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 7a5e8e35f3628..5175291f35df8 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2157,9 +2157,12 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc public void testRestoreShard() throws IOException { final IndexShard source = newStartedShard(true); - IndexShard target = newStartedShard(true); + IndexShard target = newStartedShard(true, Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), source.indexSettings().isSoftDeleteEnabled()).build()); indexDoc(source, "_doc", "0"); + EngineTestCase.generateNewSeqNo(source.getEngine()); // create a gap in the history + indexDoc(source, "_doc", "2"); if (randomBoolean()) { source.refresh("test"); } @@ -2195,16 +2198,18 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio } } })); - assertThat(target.getLocalCheckpoint(), equalTo(0L)); - assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L)); - assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L)); + assertThat(target.getLocalCheckpoint(), equalTo(2L)); + assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard( - target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L)); + target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L)); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L)); - assertDocs(target, "0"); + assertDocs(target, "0", "2"); - closeShards(source, target); + closeShard(source, false); + closeShards(target); } public void testSearcherWrapperIsUsed() throws IOException { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java new file mode 100644 index 0000000000000..11bdfb7bcc741 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -0,0 +1,146 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.Collections.emptySet; +import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; + +/** A dummy repository for testing which just needs restore overridden */ +public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { + private final String indexName; + + public RestoreOnlyRepository(String indexName) { + this.indexName = indexName; + } + + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + } + + @Override + protected void doClose() { + } + + @Override + public RepositoryMetaData getMetadata() { + return null; + } + + @Override + public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { + return null; + } + + @Override + public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { + return null; + } + + @Override + public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { + return null; + } + + @Override + public RepositoryData getRepositoryData() { + Map> map = new HashMap<>(); + map.put(new IndexId(indexName, "blah"), emptySet()); + return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList()); + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + } + + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState) { + return null; + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + } + + @Override + public long getSnapshotThrottleTimeInNanos() { + return 0; + } + + @Override + public long getRestoreThrottleTimeInNanos() { + return 0; + } + + @Override + public String startVerification() { + return null; + } + + @Override + public void endVerification(String verificationToken) { + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + } + + @Override + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + return null; + } + + @Override + public void verify(String verificationToken, DiscoveryNode localNode) { + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java index eabb05a537ca7..ed3a836d2c506 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java +++ b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java @@ -52,6 +52,7 @@ public class BackgroundIndexer implements AutoCloseable { private final Logger logger = LogManager.getLogger(getClass()); final Thread[] writers; + final Client client; final CountDownLatch stopLatch; final CopyOnWriteArrayList failures; final AtomicBoolean stop = new AtomicBoolean(false); @@ -122,6 +123,7 @@ public BackgroundIndexer(final String index, final String type, final Client cli if (random == null) { random = RandomizedTest.getRandom(); } + this.client = client; useAutoGeneratedIDs = random.nextBoolean(); failures = new CopyOnWriteArrayList<>(); writers = new Thread[writerCount]; @@ -316,6 +318,10 @@ public void close() throws Exception { stop(); } + public Client getClient() { + return client; + } + /** * Returns the ID set of all documents indexed by this indexer run */ diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 96285021d6d95..6e358b76fd606 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -240,21 +240,17 @@ private void initiateFollowing( final PutFollowAction.Request request, final ActionListener listener) { assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT."; - activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()}, - request.waitForActiveShards(), request.timeout(), result -> { - if (result) { - FollowParameters parameters = request.getParameters(); - ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request(); - resumeFollowRequest.setFollowerIndex(request.getFollowerIndex()); - resumeFollowRequest.setParameters(new FollowParameters(parameters)); - client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap( - r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())), - listener::onFailure - )); - } else { - listener.onResponse(new PutFollowAction.Response(true, false, false)); - } - }, listener::onFailure); + FollowParameters parameters = request.getParameters(); + ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request(); + resumeFollowRequest.setFollowerIndex(request.getFollowerIndex()); + resumeFollowRequest.setParameters(new FollowParameters(parameters)); + client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap( + r -> activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()}, + request.waitForActiveShards(), request.timeout(), result -> + listener.onResponse(new PutFollowAction.Response(true, result, r.isAcknowledged())), + listener::onFailure), + listener::onFailure + )); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index a09e62f016983..c94b1a52863d8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -44,6 +44,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -58,6 +59,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -92,6 +94,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.stream.Collectors; @@ -557,6 +561,70 @@ protected void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index fol }); } + /** + * Waits until at least a give number of document is visible for searchers + * + * @param numDocs number of documents to wait for + * @param indexer a {@link org.elasticsearch.test.BackgroundIndexer}. Will be first checked for documents indexed. + * This saves on unneeded searches. + * @return the actual number of docs seen. + */ + public long waitForDocs(final long numDocs, final BackgroundIndexer indexer) throws InterruptedException { + // indexing threads can wait for up to ~1m before retrying when they first try to index into a shard which is not STARTED. + return waitForDocs(numDocs, 90, TimeUnit.SECONDS, indexer); + } + + /** + * Waits until at least a give number of document is visible for searchers + * + * @param numDocs number of documents to wait for + * @param maxWaitTime if not progress have been made during this time, fail the test + * @param maxWaitTimeUnit the unit in which maxWaitTime is specified + * @param indexer Will be first checked for documents indexed. + * This saves on unneeded searches. + * @return the actual number of docs seen. + */ + public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, final BackgroundIndexer indexer) + throws InterruptedException { + final AtomicLong lastKnownCount = new AtomicLong(-1); + long lastStartCount = -1; + BooleanSupplier testDocs = () -> { + lastKnownCount.set(indexer.totalIndexedDocs()); + if (lastKnownCount.get() >= numDocs) { + try { + long count = indexer.getClient().prepareSearch() + .setTrackTotalHits(true) + .setSize(0) + .setQuery(QueryBuilders.matchAllQuery()) + .get() + .getHits().getTotalHits().value; + + if (count == lastKnownCount.get()) { + // no progress - try to refresh for the next time + indexer.getClient().admin().indices().prepareRefresh().get(); + } + lastKnownCount.set(count); + } catch (Exception e) { // count now acts like search and barfs if all shards failed... + logger.debug("failed to executed count", e); + return false; + } + logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs); + } else { + logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs); + } + return lastKnownCount.get() >= numDocs; + }; + + while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) { + if (lastStartCount == lastKnownCount.get()) { + // we didn't make any progress + fail("failed to reach " + numDocs + "docs"); + } + lastStartCount = lastKnownCount.get(); + } + return lastKnownCount.get(); + } + static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index acd380b88e498..db23eb9c8a71e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -32,7 +32,6 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; @@ -61,6 +60,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.SnapshotRestoreException; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; @@ -118,93 +118,85 @@ public void testFollowIndex() throws Exception { } else { firstBatchNumDocs = randomIntBetween(10, 64); } - final int flushPoint = (int) (firstBatchNumDocs * 0.75); logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); - BulkRequestBuilder bulkRequestBuilder = leaderClient().prepareBulk(); - for (int i = 0; i < flushPoint; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - IndexRequest indexRequest = new IndexRequest("index1", "doc", Integer.toString(i)) - .source(source, XContentType.JSON) - .timeout(TimeValue.timeValueSeconds(1)); - bulkRequestBuilder.add(indexRequest); - } - bulkRequestBuilder.get(); - - leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get(); - - // Index some docs after the flush that might be recovered in the normal index following operations - for (int i = flushPoint; i < firstBatchNumDocs; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); - } + try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs, + randomIntBetween(1, 5))) { + waitForDocs(randomInt(firstBatchNumDocs), indexer); + leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get(); + waitForDocs(firstBatchNumDocs, indexer); + indexer.assertNoFailures(); - boolean waitOnAll = randomBoolean(); + boolean waitOnAll = randomBoolean(); - final PutFollowAction.Request followRequest; - if (waitOnAll) { - followRequest = putFollow("index1", "index2", ActiveShardCount.ALL); - } else { - followRequest = putFollow("index1", "index2", ActiveShardCount.ONE); - } - PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - assertTrue(response.isFollowIndexCreated()); - assertTrue(response.isFollowIndexShardsAcked()); - assertTrue(response.isIndexFollowingStarted()); - - ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true); - ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).actionGet().getIndices().get("index2"); - for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) { + final PutFollowAction.Request followRequest; if (waitOnAll) { - assertTrue(shardHealth.isPrimaryActive()); - assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards()); + followRequest = putFollow("index1", "index2", ActiveShardCount.ALL); } else { - assertTrue(shardHealth.isPrimaryActive()); + followRequest = putFollow("index1", "index2", ActiveShardCount.ONE); } - } - - final Map firstBatchNumDocsPerShard = new HashMap<>(); - final ShardStats[] firstBatchShardStats = - leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (final ShardStats shardStats : firstBatchShardStats) { - if (shardStats.getShardRouting().primary()) { - long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; - firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true); + ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).get().getIndices().get("index2"); + for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) { + if (waitOnAll) { + assertTrue(shardHealth.isPrimaryActive()); + assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards()); + } else { + assertTrue(shardHealth.isPrimaryActive()); + } } - } - assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } - for (int i = 0; i < firstBatchNumDocs; i++) { - assertBusy(assertExpectedDocumentRunnable(i)); - } + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); - pauseFollow("index2"); - followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get(); - final int secondBatchNumDocs = randomIntBetween(2, 64); - logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); - for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); - } + for (String docId : indexer.getIds()) { + assertBusy(() -> { + final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); + }); + } - final Map secondBatchNumDocsPerShard = new HashMap<>(); - final ShardStats[] secondBatchShardStats = - leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (final ShardStats shardStats : secondBatchShardStats) { - if (shardStats.getShardRouting().primary()) { - final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; - secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + pauseFollow("index2"); + followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get(); + final int secondBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); + indexer.continueIndexing(secondBatchNumDocs); + + final Map secondBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] secondBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : secondBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } } - } - assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard)); + assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard)); - for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { - assertBusy(assertExpectedDocumentRunnable(i)); + for (String docId : indexer.getIds()) { + assertBusy(() -> { + final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); + }); + } + pauseFollow("index2"); + assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); } - pauseFollow("index2"); - assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); } public void testFollowIndexWithConcurrentMappingChanges() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java index 78529c9878331..1a64608f3dfc3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java @@ -5,28 +5,47 @@ */ package org.elasticsearch.xpack.ccr.index.engine; +import org.apache.lucene.store.IOContext; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.RestoreOnlyRepository; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrSettings; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.CountDownLatch; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class FollowEngineIndexShardTests extends IndexShardTestCase { @@ -79,4 +98,63 @@ public void testDoNotFillGaps() throws Exception { closeShards(indexShard); } + public void testRestoreShard() throws IOException { + final Settings sourceSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexShard source = newStartedShard(true, sourceSettings); + final Settings targetSettings = Settings.builder() + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + IndexShard target = newStartedShard(true, targetSettings, new FollowingEngineFactory()); + assertThat(IndexShardTestCase.getEngine(target), instanceOf(FollowingEngine.class)); + + indexDoc(source, "_doc", "0"); + EngineTestCase.generateNewSeqNo(IndexShardTestCase.getEngine(source)); + indexDoc(source, "_doc", "2"); + if (randomBoolean()) { + source.refresh("test"); + } + flushShard(source); // only flush source + ShardRouting routing = ShardRoutingHelper.initWithSameId(target.routingEntry(), + RecoverySource.ExistingStoreRecoverySource.INSTANCE); + final Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID())); + routing = ShardRoutingHelper.newWithRestoreSource(routing, + new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test")); + target = reinitShard(target, routing); + Store sourceStore = source.store(); + Store targetStore = target.store(); + + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + target.markAsRecovering("store", new RecoveryState(routing, localNode, null)); + assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") { + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { + try { + cleanLuceneIndex(targetStore.directory()); + for (String file : sourceStore.directory().listAll()) { + if (file.equals("write.lock") || file.startsWith("extra")) { + continue; + } + targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + })); + assertThat(target.getLocalCheckpoint(), equalTo(0L)); + assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); + IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); + + assertDocs(target, "0", "2"); + + closeShard(source, false); + closeShards(target); + } + } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index b6717c588f0b9..869082a300556 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -132,7 +132,8 @@ protected void closeInternal() { snapshot.syncSnapshot(snapshotIndexCommit); // we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo(); - tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc()); + final long maxDoc = segmentInfos.totalMaxDoc(); + tempStore.bootstrapNewHistory(maxDoc, maxDoc); store.incRef(); try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) { IndexCommit indexCommit = reader.getIndexCommit(); From ad962907e0eb8e5c5a8edd65102ab66fe4dc3d4f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 5 Feb 2019 17:04:14 -0600 Subject: [PATCH 2/6] Fix --- .../test/java/org/elasticsearch/xpack/CcrIntegTestCase.java | 2 +- .../java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index c94b1a52863d8..b970761d64bcb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -597,7 +597,7 @@ public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTim .setSize(0) .setQuery(QueryBuilders.matchAllQuery()) .get() - .getHits().getTotalHits().value; + .getHits().getTotalHits(); if (count == lastKnownCount.get()) { // no progress - try to refresh for the next time diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index db23eb9c8a71e..a6d42382397a4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -120,7 +120,7 @@ public void testFollowIndex() throws Exception { } logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); - try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs, + try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "doc", leaderClient(), firstBatchNumDocs, randomIntBetween(1, 5))) { waitForDocs(randomInt(firstBatchNumDocs), indexer); leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get(); @@ -165,7 +165,7 @@ public void testFollowIndex() throws Exception { for (String docId : indexer.getIds()) { assertBusy(() -> { - final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + final GetResponse getResponse = followerClient().prepareGet("index2", "doc", docId).get(); assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); }); } @@ -190,7 +190,7 @@ public void testFollowIndex() throws Exception { for (String docId : indexer.getIds()) { assertBusy(() -> { - final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + final GetResponse getResponse = followerClient().prepareGet("index2", "doc", docId).get(); assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); }); } From 9e1bb5ba0c9af63fa2a6e397accdb27ae0359037 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 8 Feb 2019 10:41:54 -0700 Subject: [PATCH 3/6] Changes --- .../org/elasticsearch/index/store/Store.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 14e5514ba1cb2..01f767305fe52 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1426,9 +1426,21 @@ public void createEmpty() throws IOException { public void bootstrapNewHistory() throws IOException { metadataLock.writeLock().lock(); try { - Map userData = readLastCommittedSegmentsInfo().getUserData(); + SegmentInfos segmentCommitInfos = readLastCommittedSegmentsInfo(); + Map userData = segmentCommitInfos.getUserData(); final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); - final long localCheckpoint = Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + String rawLocalCheckpoint = userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY); + final long localCheckpoint; + if (rawLocalCheckpoint == null) { + // If the local checkpoint is null this user data is from pre-6.0. Prior to 6.0 we would set + // the local checkpoint equal to the max sequence number + localCheckpoint = maxSeqNo; + // If the local checkpoint we expect that this is Lucene version 6 or earlier + assert segmentCommitInfos.getCommitLuceneVersion().major < 7 : "Found Lucene version: " + + segmentCommitInfos.getCommitLuceneVersion().major; + } else { + localCheckpoint = Long.parseLong(rawLocalCheckpoint); + } bootstrapNewHistory(localCheckpoint, maxSeqNo); } finally { metadataLock.writeLock().unlock(); From e32da3df4f302489f31d0e4b8b564dc4ad75bc64 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 8 Feb 2019 11:36:54 -0700 Subject: [PATCH 4/6] WIP --- .../org/elasticsearch/index/store/Store.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 01f767305fe52..d46d5b24d25d9 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1426,22 +1426,18 @@ public void createEmpty() throws IOException { public void bootstrapNewHistory() throws IOException { metadataLock.writeLock().lock(); try { - SegmentInfos segmentCommitInfos = readLastCommittedSegmentsInfo(); - Map userData = segmentCommitInfos.getUserData(); - final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); - String rawLocalCheckpoint = userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY); - final long localCheckpoint; + final SegmentInfos segmentCommitInfos = readLastCommittedSegmentsInfo(); + final Map userData = segmentCommitInfos.getUserData(); + final String rawLocalCheckpoint = userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY); + final String rawMaxSeqNo = userData.get(SequenceNumbers.MAX_SEQ_NO); if (rawLocalCheckpoint == null) { - // If the local checkpoint is null this user data is from pre-6.0. Prior to 6.0 we would set - // the local checkpoint equal to the max sequence number - localCheckpoint = maxSeqNo; - // If the local checkpoint we expect that this is Lucene version 6 or earlier + assert rawMaxSeqNo == null : "Local checkpoint null but max sequence number: " + rawMaxSeqNo; + // If the local checkpoint is null we expect that this is Lucene version 6 or earlier assert segmentCommitInfos.getCommitLuceneVersion().major < 7 : "Found Lucene version: " + segmentCommitInfos.getCommitLuceneVersion().major; - } else { - localCheckpoint = Long.parseLong(rawLocalCheckpoint); } - bootstrapNewHistory(localCheckpoint, maxSeqNo); + final SequenceNumbers.CommitInfo seqno = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet()); + bootstrapNewHistory(seqno.maxSeqNo, seqno.localCheckpoint); } finally { metadataLock.writeLock().unlock(); } From b38361ffcaf4b4b7864ee5cd0ee0993b896d59ee Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 8 Feb 2019 11:58:39 -0700 Subject: [PATCH 5/6] Fix test --- server/src/main/java/org/elasticsearch/index/store/Store.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index d46d5b24d25d9..e767a902a7299 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1437,7 +1437,7 @@ public void bootstrapNewHistory() throws IOException { segmentCommitInfos.getCommitLuceneVersion().major; } final SequenceNumbers.CommitInfo seqno = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet()); - bootstrapNewHistory(seqno.maxSeqNo, seqno.localCheckpoint); + bootstrapNewHistory(seqno.localCheckpoint, seqno.maxSeqNo); } finally { metadataLock.writeLock().unlock(); } From 8ec15427f96e0c25ec6460aac1f87b4276e48ee9 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 8 Feb 2019 15:44:44 -0700 Subject: [PATCH 6/6] Change --- .../java/org/elasticsearch/index/store/Store.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index e767a902a7299..7f21218b1f9de 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1430,12 +1430,11 @@ public void bootstrapNewHistory() throws IOException { final Map userData = segmentCommitInfos.getUserData(); final String rawLocalCheckpoint = userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY); final String rawMaxSeqNo = userData.get(SequenceNumbers.MAX_SEQ_NO); - if (rawLocalCheckpoint == null) { - assert rawMaxSeqNo == null : "Local checkpoint null but max sequence number: " + rawMaxSeqNo; - // If the local checkpoint is null we expect that this is Lucene version 6 or earlier - assert segmentCommitInfos.getCommitLuceneVersion().major < 7 : "Found Lucene version: " + - segmentCommitInfos.getCommitLuceneVersion().major; - } + assert (rawLocalCheckpoint == null) == (rawMaxSeqNo == null) : + "local checkpoint was " + rawLocalCheckpoint + " but max seq no was " + rawMaxSeqNo; + + assert rawLocalCheckpoint != null || segmentCommitInfos.getCommitLuceneVersion().major < 7 : + "Found Lucene version: " + segmentCommitInfos.getCommitLuceneVersion().major; final SequenceNumbers.CommitInfo seqno = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet()); bootstrapNewHistory(seqno.localCheckpoint, seqno.maxSeqNo); } finally {