diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c62cec46faa26..c50a194665326 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1099,7 +1099,7 @@ public void trimTranslog() { /** * Rolls the tranlog generation and cleans unneeded. */ - private void rollTranslogGeneration() { + public void rollTranslogGeneration() { final Engine engine = getEngine(); engine.rollTranslogGeneration(); } diff --git a/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index e9d593b728edc..c0969cbc80ec8 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -63,6 +63,7 @@ public int skippedOperations() { @Override public Translog.Operation next() throws IOException { + // TODO: Read translog forward in 9.0+ for (; index >= 0; index--) { final TranslogSnapshot current = translogs[index]; Translog.Operation op; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 15bc17b3f7ee2..5d58325d4cff0 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -379,7 +379,7 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request); - recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), + recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE)); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFinalizeRecoveryRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFinalizeRecoveryRequest.java index f7c302630b1ff..fa1e195b17432 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFinalizeRecoveryRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFinalizeRecoveryRequest.java @@ -19,30 +19,39 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; -public class RecoveryFinalizeRecoveryRequest extends TransportRequest { +final class RecoveryFinalizeRecoveryRequest extends TransportRequest { - private long recoveryId; - private ShardId shardId; - private long globalCheckpoint; + private final long recoveryId; + private final ShardId shardId; + private final long globalCheckpoint; + private final long trimAboveSeqNo; - public RecoveryFinalizeRecoveryRequest(StreamInput in) throws IOException { + RecoveryFinalizeRecoveryRequest(StreamInput in) throws IOException { super(in); recoveryId = in.readLong(); shardId = new ShardId(in); globalCheckpoint = in.readZLong(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + trimAboveSeqNo = in.readZLong(); + } else { + trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + } } - RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint) { + RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint, final long trimAboveSeqNo) { this.recoveryId = recoveryId; this.shardId = shardId; this.globalCheckpoint = globalCheckpoint; + this.trimAboveSeqNo = trimAboveSeqNo; } public long recoveryId() { @@ -57,12 +66,19 @@ public long globalCheckpoint() { return globalCheckpoint; } + public long trimAboveSeqNo() { + return trimAboveSeqNo; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(recoveryId); shardId.writeTo(out); out.writeZLong(globalCheckpoint); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeZLong(trimAboveSeqNo); + } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index ed7b90b940d38..fbbcb08eabe56 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -325,7 +325,9 @@ && isTargetSameHistory() }, onFailure); - sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure); + // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2 + final long trimAboveSeqNo = startingSeqNo - 1; + sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure); finalizeStep.whenComplete(r -> { final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time @@ -747,7 +749,7 @@ private void sendBatch( } } - void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener listener) throws IOException { + void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionListener listener) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -764,7 +766,7 @@ void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener finalizeListener = new StepListener<>(); - cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, finalizeListener)); + cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener)); finalizeListener.whenComplete(r -> { runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 42d36d7096201..166c2c5b99614 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -289,9 +289,8 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { + public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, ActionListener listener) { ActionListener.completeWith(listener, () -> { - final IndexShard indexShard = indexShard(); indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery"); // Persist the global checkpoint. indexShard.sync(); @@ -299,6 +298,15 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener l if (hasUncommittedOperations()) { indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); } + if (trimAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + // We should erase all translog operations above trimAboveSeqNo as we have received either the same or a newer copy + // from the recovery source in phase2. Rolling a new translog generation is not strictly required here for we won't + // trim the current generation. It's merely to satisfy the assumption that the current generation does not have any + // operation that would be trimmed (see TranslogWriter#assertNoSeqAbove). This assumption does not hold for peer + // recovery because we could have received operations above startingSeqNo from the previous primary terms. + indexShard.rollTranslogGeneration(); + indexShard.trimOperationOfPreviousPrimaryTerms(trimAboveSeqNo); + } indexShard.finalizeRecovery(); return null; }); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index a1990dda3a09c..13fc04098f191 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -42,9 +42,11 @@ public interface RecoveryTargetHandler { * the global checkpoint. * * @param globalCheckpoint the global checkpoint on the recovery source + * @param trimAboveSeqNo The recovery target should erase its existing translog above this sequence number + * from the previous primary terms. * @param listener the listener which will be notified when this method is completed */ - void finalizeRecovery(long globalCheckpoint, ActionListener listener); + void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener); /** * Handoff the primary context between the relocation source and the relocation target. diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 9b2c0f752a5d4..3140de8169d80 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -86,9 +86,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { + public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, final ActionListener listener) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE, - new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint), + new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint, trimAboveSeqNo), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 68af2d6438f6c..ed8611aaee588 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -134,6 +134,7 @@ import org.elasticsearch.index.store.FsDirectoryFactory; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.SnapshotMatchers; +import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -184,6 +185,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; @@ -5946,4 +5948,34 @@ public long addDocument(Iterable doc) throws IOExcepti assertNotNull(engine.failedEngine.get()); } } + + /** + * We can trim translog on primary promotion and peer recovery based on the fact we add operations with either + * REPLICA or PEER_RECOVERY origin to translog although they already exist in the engine (i.e. hasProcessed() == true). + * If we decide not to add those already-processed operations to translog, we need to study carefully the consequence + * of the translog trimming in these two places. + */ + public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws Exception { + List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); + applyOperations(engine, operations); + Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); + try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(operations.size())); + assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), + equalTo(seqNos)); + } + primaryTerm.set(randomLongBetween(primaryTerm.get(), Long.MAX_VALUE)); + engine.rollTranslogGeneration(); + engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog + try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(operations.size())); + assertNull(snapshot.next()); + } + applyOperations(engine, operations); + try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(operations.size() * 2)); + assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), + equalTo(seqNos)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index f05ddce567a8a..76ca0c66a372d 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -504,9 +504,9 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { recoveryStart.countDown(); return new RecoveryTarget(indexShard, node, recoveryListener) { @Override - public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { + public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener) { recoveryDone.set(true); - super.finalizeRecovery(globalCheckpoint, listener); + super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener); } }; }); @@ -868,13 +868,13 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada } @Override - public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { + public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener) { if (hasBlocked() == false) { // it maybe that not ops have been transferred, block now blockIfNeeded(RecoveryState.Stage.TRANSLOG); } blockIfNeeded(RecoveryState.Stage.FINALIZE); - super.finalizeRecovery(globalCheckpoint, listener); + super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b2b7c80450c95..3e507a3cfb685 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1064,7 +1064,7 @@ public void onFailure(Exception e) { onFailureException.get(), hasToString(containsString("operation primary term [" + oldPrimaryTerm + "] is too old"))); } - closeShards(indexShard); + closeShard(indexShard, false); // skip asserting translog and Lucene as we rolled back Lucene but did not execute resync } public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception { @@ -2760,8 +2760,8 @@ public void indexTranslogOperations( } @Override - public void finalizeRecovery(long globalCheckpoint, ActionListener listener) { - super.finalizeRecovery(globalCheckpoint, + public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener) { + super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, ActionListener.wrap( r -> { assertListenerCalled.accept(replica); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 9ab56df76d5c3..5c8d1b20f978b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices.recovery; import org.apache.lucene.analysis.TokenStream; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -33,6 +34,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; @@ -75,6 +77,7 @@ import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.indices.flush.SyncedFlushUtil; import org.elasticsearch.indices.recovery.RecoveryState.Stage; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.node.RecoverySettingsChunkSizePlugin; import org.elasticsearch.plugins.AnalysisPlugin; import org.elasticsearch.plugins.Plugin; @@ -117,6 +120,8 @@ import static java.util.Collections.singletonMap; import static java.util.stream.Collectors.toList; +import static org.elasticsearch.action.DocWriteResponse.Result.CREATED; +import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED; import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -125,6 +130,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -1326,4 +1332,55 @@ public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception { assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); } } + public void testPeerRecoveryTrimsLocalTranslog() throws Exception { + internalCluster().startNode(); + List dataNodes = internalCluster().startDataOnlyNodes(2); + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) + .put("index.routing.allocation.include._name", String.join(",", dataNodes)).build()); + ensureGreen(indexName); + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + DiscoveryNode nodeWithOldPrimary = clusterState.nodes().get(clusterState.routingTable() + .index(indexName).shard(0).primaryShard().currentNodeId()); + MockTransportService transportService = (MockTransportService) internalCluster() + .getInstance(TransportService.class, nodeWithOldPrimary.getName()); + CountDownLatch readyToRestartNode = new CountDownLatch(1); + AtomicBoolean stopped = new AtomicBoolean(); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals("indices:data/write/bulk[s][r]") && randomInt(100) < 5) { + throw new NodeClosedException(nodeWithOldPrimary); + } + // prevent the primary from marking the replica as stale so the replica can get promoted. + if (action.equals("internal:cluster/shard/failure")) { + stopped.set(true); + readyToRestartNode.countDown(); + throw new NodeClosedException(nodeWithOldPrimary); + } + connection.sendRequest(requestId, action, request, options); + }); + Thread[] indexers = new Thread[randomIntBetween(1, 8)]; + for (int i = 0; i < indexers.length; i++) { + indexers[i] = new Thread(() -> { + while (stopped.get() == false) { + try { + IndexResponse response = client().prepareIndex(indexName, "_doc") + .setSource(Map.of("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON).get(); + assertThat(response.getResult(), isOneOf(CREATED, UPDATED)); + } catch (ElasticsearchException ignored) { + } + } + }); + } + for (Thread indexer : indexers) { + indexer.start(); + } + readyToRestartNode.await(); + transportService.clearAllRules(); + internalCluster().restartNode(nodeWithOldPrimary.getName(), new InternalTestCluster.RestartCallback()); + for (Thread indexer : indexers) { + indexer.join(); + } + ensureGreen(indexName); + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 51b45890765b8..e14dcb02390ab 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -782,7 +782,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { + public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener) { } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index fc23539207710..3b338ff824f66 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; @@ -41,6 +42,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; @@ -461,4 +463,44 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo closeShards(replica); } } + + public void testRecoveryTrimsLocalTranslog() throws Exception { + try (ReplicationGroup shards = createGroup(between(1, 2))) { + shards.startAll(); + IndexShard oldPrimary = shards.getPrimary(); + shards.indexDocs(scaledRandomIntBetween(1, 100)); + if (randomBoolean()) { + shards.flush(); + } + int inflightDocs = scaledRandomIntBetween(1, 100); + for (int i = 0; i < inflightDocs; i++) { + final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i).source("{}", XContentType.JSON); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); + for (IndexShard replica : randomSubsetOf(shards.getReplicas())) { + indexOnReplica(bulkShardRequest, shards, replica); + } + if (rarely()) { + shards.flush(); + } + } + shards.syncGlobalCheckpoint(); + shards.promoteReplicaToPrimary(randomFrom(shards.getReplicas())).get(); + oldPrimary.close("demoted", false); + oldPrimary.store().close(); + oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); + shards.recoverReplica(oldPrimary); + for (IndexShard shard : shards) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); + } + shards.promoteReplicaToPrimary(oldPrimary).get(); + for (IndexShard shard : shards) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 5f435f4f54bb3..9d44b12749c21 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -114,7 +114,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -131,11 +130,13 @@ import static java.util.Collections.emptyList; import static java.util.Collections.shuffle; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; public abstract class EngineTestCase extends ESTestCase { @@ -848,14 +849,15 @@ public List generateHistoryOnReplica(int numOps, boolean allow switch (opType) { case INDEX: operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), - i, null, Engine.Operation.Origin.REPLICA, startTime, -1, true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); + i, null, randomFrom(REPLICA, PEER_RECOVERY), startTime, -1, true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); break; case DELETE: operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), - i, null, Engine.Operation.Origin.REPLICA, startTime, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); + i, null, randomFrom(REPLICA, PEER_RECOVERY), startTime, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); break; case NO_OP: - operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, startTime, "test-" + i)); + operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), + randomFrom(REPLICA, PEER_RECOVERY), startTime, "test-" + i)); break; default: throw new IllegalStateException("Unknown operation type [" + opType + "]"); @@ -1054,8 +1056,7 @@ public static List getDocIds(Engine engine, boolean refresh */ public static List readAllOperationsInLucene(Engine engine, MapperService mapper) throws IOException { final List operations = new ArrayList<>(); - long maxSeqNo = Math.max(0, ((InternalEngine)engine).getLocalCheckpointTracker().getMaxSeqNo()); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, maxSeqNo, false)) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null){ operations.add(op); @@ -1073,18 +1074,19 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e return; } final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo(); - if (maxSeqNo < 0) { - return; // nothing to check - } - final Map translogOps = new HashMap<>(); + final List translogOps = new ArrayList<>(); try (Translog.Snapshot snapshot = EngineTestCase.getTranslog(engine).newSnapshot()) { Translog.Operation op; while ((op = snapshot.next()) != null) { - translogOps.put(op.seqNo(), op); + assertThat("translog operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo)); + translogOps.add(op); } } final Map luceneOps = readAllOperationsInLucene(engine, mapper).stream() .collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); + for (Translog.Operation op : luceneOps.values()) { + assertThat("lucene operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo)); + } final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint(); final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations(); final long seqNoForRecovery; @@ -1092,10 +1094,10 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; } final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps); - for (Translog.Operation translogOp : translogOps.values()) { + for (Translog.Operation translogOp : translogOps) { final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo()); if (luceneOp == null) { - if (minSeqNoToRetain <= translogOp.seqNo() && translogOp.seqNo() <= maxSeqNo) { + if (minSeqNoToRetain <= translogOp.seqNo()) { fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " + "retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]"); } else { diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 2788cd0b2f1de..cf8a9ff206de7 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -813,7 +813,7 @@ private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard repl /** * indexes the given requests on the supplied primary, modifying it for replicas */ - BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { + public BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { return executeReplicationRequestOnPrimary(primary, request); } @@ -827,7 +827,7 @@ BulkShardRequest deleteOnPrimary(DeleteRequest request, IndexShard primary) thro /** * indexes the given requests on the supplied replica shard */ - void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception { + public void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception { indexOnReplica(request, group, replica, group.primary.getPendingPrimaryTerm()); } diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java index 3e1ae3aa2edce..1031a4bb7fb9a 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java @@ -50,8 +50,8 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { - executor.execute(() -> target.finalizeRecovery(globalCheckpoint, listener)); + public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener) { + executor.execute(() -> target.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener)); } @Override