From 2509d17ebbee94cd554cc18b7f499aae66d77eff Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 10 Jan 2018 17:47:34 -0500 Subject: [PATCH 1/7] Replica starts peer recovery with a safe commit Today a replica starts a peer-recovery with the last commit. If the last commit is not a safe commit, a replica will immediately fallback to the file based sync which is more expensive than the sequence based recovery. This commit modifies the peer-recovery in replica to start with a safe commit. Moreover, we can keep the existing translog on the target if the recovery is sequence based recovery. --- .../elasticsearch/index/engine/Engine.java | 5 + .../index/engine/InternalEngine.java | 9 ++ .../elasticsearch/index/shard/IndexShard.java | 16 ++- .../index/shard/StoreRecovery.java | 2 +- .../recovery/PeerRecoveryTargetService.java | 35 ++++-- ...> RecoveryOpenFileBasedEngineRequest.java} | 31 ++--- .../RecoveryOpenSeqBasedEngineRequest.java | 66 ++++++++++ .../recovery/RecoverySourceHandler.java | 10 +- .../indices/recovery/RecoveryTarget.java | 9 +- .../recovery/RecoveryTargetHandler.java | 12 +- .../recovery/RemoteRecoveryTargetHandler.java | 19 ++- .../RecoveryDuringReplicationTests.java | 29 ++++- .../index/shard/IndexShardTests.java | 23 +++- .../indices/recovery/IndexRecoveryIT.java | 2 +- .../PeerRecoveryTargetServiceTests.java | 116 +++++------------- .../recovery/RecoverySourceHandlerTests.java | 2 +- .../indices/recovery/RecoveryTests.java | 39 ++++++ 17 files changed, 290 insertions(+), 135 deletions(-) rename core/src/main/java/org/elasticsearch/indices/recovery/{RecoveryPrepareForTranslogOperationsRequest.java => RecoveryOpenFileBasedEngineRequest.java} (81%) create mode 100644 core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenSeqBasedEngineRequest.java diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 6d37502bd604a..acd2c09aa3c47 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1513,6 +1513,11 @@ public interface Warmer { */ public abstract Engine recoverFromTranslog() throws IOException; + /** + * Do not replay translog operations, but make the engine be ready. + */ + public abstract void skipTranslogRecovery(); + /** * Returns true iff this engine is currently recovering from translog. */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a0d1fa92a2efc..c1c82c5a72e13 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -402,6 +402,15 @@ public InternalEngine recoverFromTranslog() throws IOException { return this; } + @Override + public void skipTranslogRecovery() { + if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + throw new IllegalStateException("Can't skip translog recovery with open mode: " + openMode); + } + assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; + pendingTranslogRecovery.set(false); // we are good - now we can commit + } + private IndexCommit getStartingCommitPoint() throws IOException { if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f5eba1b4f62b4..c441d6f80e1b4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1303,9 +1303,20 @@ public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalC * opens the engine on top of the existing lucene engine and translog. * Operations from the translog will be replayed to bring lucene up to date. **/ - public void openIndexAndTranslog() throws IOException { + public void openIndexAndRecoveryFromTranslog() throws IOException { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE; innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false); + getEngine().recoverFromTranslog(); + } + + /** + * Opens the engine on top of the existing lucene engine and translog. + * The translog is kept but its operations won't be replayed. + */ + public void openIndexAndSkipTranslogRecovery() throws IOException { + assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER; + innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false); + getEngine().skipTranslogRecovery(); } private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException { @@ -1338,13 +1349,12 @@ private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, fi globalCheckpointTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()), "read from translog checkpoint"); } - Engine newEngine = createNewEngine(config); + createNewEngine(config); verifyNotClosed(); if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, // we still give sync'd flush a chance to run: active.set(true); - newEngine.recoverFromTranslog(); } assertSequenceNumbersInCommit(); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 6bc1ce2882c92..81ffbea642c58 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -401,7 +401,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe logger.debug("failed to list file details", e); } if (indexShouldExists) { - indexShard.openIndexAndTranslog(); + indexShard.openIndexAndRecoveryFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); } else { indexShard.createIndexAndTranslog(); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index ba5dc5c60f29f..ada2d4f8d4420 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -21,6 +21,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; @@ -39,6 +41,7 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.engine.CombinedDeletionPolicy; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -60,6 +63,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -78,7 +82,8 @@ public static class Actions { public static final String FILE_CHUNK = "internal:index/shard/recovery/file_chunk"; public static final String CLEAN_FILES = "internal:index/shard/recovery/clean_files"; public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops"; - public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog"; + public static final String OPEN_FILE_BASED_ENGINE = "internal:index/shard/recovery/prepare_translog"; + public static final String OPEN_SEQUENCE_BASED_ENGINE = "internal:index/shard/recovery/open_seq_based_engine"; public static final String FINALIZE = "internal:index/shard/recovery/finalize"; public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate"; public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context"; @@ -108,8 +113,10 @@ public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, Trans FileChunkTransportRequestHandler()); transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new CleanFilesRequestHandler()); - transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool - .Names.GENERIC, new PrepareForTranslogOperationsRequestHandler()); + transportService.registerRequestHandler(Actions.OPEN_FILE_BASED_ENGINE, ThreadPool.Names.GENERIC, + RecoveryOpenFileBasedEngineRequest::new, new OpenFileBasedEngineRequestHandler()); + transportService.registerRequestHandler(Actions.OPEN_SEQUENCE_BASED_ENGINE, ThreadPool.Names.GENERIC, + RecoveryOpenSeqBasedEngineRequest::new, new OpenSequenceBasedEngineRequestHandler()); transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC, new TranslogOperationsRequestHandler()); transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new @@ -353,7 +360,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) { try { final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation()); - final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(null); + final List existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory()); + final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint); + final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit); if (seqNoStats.maxSeqNo <= globalCheckpoint) { assert seqNoStats.localCheckpoint <= globalCheckpoint; /* @@ -381,13 +390,25 @@ public interface RecoveryListener { void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure); } - class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler { + class OpenFileBasedEngineRequestHandler implements TransportRequestHandler { @Override - public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { + public void messageReceived(RecoveryOpenFileBasedEngineRequest request, TransportChannel channel) throws Exception { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps()); + recoveryRef.target().openFileBasedEngine(request.totalTranslogOps()); + } + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + } + + class OpenSequenceBasedEngineRequestHandler implements TransportRequestHandler { + + @Override + public void messageReceived(RecoveryOpenSeqBasedEngineRequest request, TransportChannel channel) throws Exception { + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() + )) { + recoveryRef.target().openSequencedBasedEngine(request.totalTranslogOps()); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenFileBasedEngineRequest.java similarity index 81% rename from core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java rename to core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenFileBasedEngineRequest.java index 61cd986a1aef4..6c5672bf110f9 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenFileBasedEngineRequest.java @@ -28,16 +28,22 @@ import java.io.IOException; -public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { +final class RecoveryOpenFileBasedEngineRequest extends TransportRequest { + final long recoveryId; + final ShardId shardId; + final int totalTranslogOps; - private long recoveryId; - private ShardId shardId; - private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; - - public RecoveryPrepareForTranslogOperationsRequest() { + RecoveryOpenFileBasedEngineRequest(StreamInput in) throws IOException { + super(in); + recoveryId = in.readLong(); + shardId = ShardId.readShardId(in); + totalTranslogOps = in.readVInt(); + if (in.getVersion().before(Version.V_6_0_0_alpha1)) { + in.readLong(); // maxUnsafeAutoIdTimestamp + } } - RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) { + RecoveryOpenFileBasedEngineRequest(long recoveryId, ShardId shardId, int totalTranslogOps) { this.recoveryId = recoveryId; this.shardId = shardId; this.totalTranslogOps = totalTranslogOps; @@ -55,17 +61,6 @@ public int totalTranslogOps() { return totalTranslogOps; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - recoveryId = in.readLong(); - shardId = ShardId.readShardId(in); - totalTranslogOps = in.readVInt(); - if (in.getVersion().before(Version.V_6_0_0_alpha1)) { - in.readLong(); // maxUnsafeAutoIdTimestamp - } - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenSeqBasedEngineRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenSeqBasedEngineRequest.java new file mode 100644 index 0000000000000..2a3276c9c3209 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenSeqBasedEngineRequest.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; + +final class RecoveryOpenSeqBasedEngineRequest extends TransportRequest { + final long recoveryId; + final ShardId shardId; + final int totalTranslogOps; + + RecoveryOpenSeqBasedEngineRequest(long recoveryId, ShardId shardId, int totalTranslogOps) { + this.recoveryId = recoveryId; + this.shardId = shardId; + this.totalTranslogOps = totalTranslogOps; + } + + RecoveryOpenSeqBasedEngineRequest(StreamInput in) throws IOException { + super(in); + recoveryId = in.readLong(); + shardId = ShardId.readShardId(in); + totalTranslogOps = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(recoveryId); + shardId.writeTo(out); + out.writeVInt(totalTranslogOps); + } + + public long recoveryId() { + return this.recoveryId; + } + + public ShardId shardId() { + return shardId; + } + + public int totalTranslogOps() { + return totalTranslogOps; + } +} diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4ebce1c0b4bee..937a72ea61556 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -188,7 +188,7 @@ public RecoveryResponse recoverToTarget() throws IOException { runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); try { - prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); + openEngineOnTarget(isSequenceNumberBasedRecoveryPossible, translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } @@ -421,13 +421,17 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO } } - void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { + void openEngineOnTarget(final boolean sequencedBasedRecovery, final int totalTranslogOps) throws IOException { StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase1]: prepare remote engine for translog"); final long startEngineStart = stopWatch.totalTime().millis(); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. - cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps)); + if (sequencedBasedRecovery) { + cancellableThreads.executeIO(() -> recoveryTarget.openSequencedBasedEngine(totalTranslogOps)); + } else { + cancellableThreads.executeIO(() -> recoveryTarget.openFileBasedEngine(totalTranslogOps)); + } stopWatch.stop(); response.startTime = stopWatch.totalTime().millis() - startEngineStart; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index d383891345818..34c54144e15a8 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -362,12 +362,17 @@ private void ensureRefCount() { /*** Implementation of {@link RecoveryTargetHandler } */ @Override - public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { + public void openFileBasedEngine(int totalTranslogOps) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); - // TODO: take the local checkpoint from store as global checkpoint, once we know it's safe indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO); } + @Override + public void openSequencedBasedEngine(int totalTranslogOps) throws IOException { + state().getTranslog().totalOperations(totalTranslogOps); + indexShard().openIndexAndSkipTranslogRecovery(); + } + @Override public void finalizeRecovery(final long globalCheckpoint) throws IOException { final IndexShard indexShard = indexShard(); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index e7403986dc233..6b5eea37c0ab3 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -29,13 +29,21 @@ public interface RecoveryTargetHandler { + /** + * Opens an engine and truncates the existing translog on the target. + * After this method, the target should be ready to receive translog operations. + * + * @param totalTranslogOps total translog operations expected to be sent + */ + void openFileBasedEngine(int totalTranslogOps) throws IOException; /** - * Prepares the target to receive translog operations, after all file have been copied + * Opens an engine on the target with its own local commit point and keeps the existing translog on the target. + * After this method, the target should be ready to receive translog operations. * * @param totalTranslogOps total translog operations expected to be sent */ - void prepareForTranslogOperations(int totalTranslogOps) throws IOException; + void openSequencedBasedEngine(int totalTranslogOps) throws IOException; /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 279bec186a433..e69f138f8fbf8 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -21,6 +21,7 @@ import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.seqno.GlobalCheckpointTracker; @@ -76,13 +77,25 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe } @Override - public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps), + public void openFileBasedEngine(int totalTranslogOps) throws IOException { + transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.OPEN_FILE_BASED_ENGINE, + new RecoveryOpenFileBasedEngineRequest(recoveryId, shardId, totalTranslogOps), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } + @Override + public void openSequencedBasedEngine(int totalTranslogOps) throws IOException { + if (targetNode.getVersion().before(Version.V_7_0_0_alpha1)) { + openFileBasedEngine(totalTranslogOps); + } else { + transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.OPEN_SEQUENCE_BASED_ENGINE, + new RecoveryOpenSeqBasedEngineRequest(recoveryId, shardId, totalTranslogOps), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + } + @Override public void finalizeRecovery(final long globalCheckpoint) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE, diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 2bf7de6b94a82..3edad2635d1d0 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -31,7 +31,9 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; @@ -226,7 +228,6 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { final IndexShard oldPrimary = shards.getPrimary(); final IndexShard newPrimary = shards.getReplicas().get(0); final IndexShard replica = shards.getReplicas().get(1); - boolean expectSeqNoRecovery = true; if (randomBoolean()) { // simulate docs that were inflight when primary failed, these will be rolled back final int rollbackDocs = randomIntBetween(1, 5); @@ -239,10 +240,8 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { } if (randomBoolean()) { oldPrimary.flush(new FlushRequest(index.getName())); - expectSeqNoRecovery = false; } } - shards.promoteReplicaToPrimary(newPrimary); // check that local checkpoint of new primary is properly tracked after primary promotion @@ -252,9 +251,29 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { equalTo(totalDocs - 1L)); // index some more - totalDocs += shards.indexDocs(randomIntBetween(0, 5)); + int moreDocs = shards.indexDocs(randomIntBetween(0, 5)); + totalDocs += moreDocs; + + // As a replica keeps a safe commit, the file-based recovery only happens if the required translog + // for the sequence based recovery are not fully retained and extra documents were added to the primary. + boolean expectSeqNoRecovery = (moreDocs == 0 || frequently()); + int uncommittedOpsOnPrimary = 0; + if (expectSeqNoRecovery == false) { + IndexMetaData.Builder builder = IndexMetaData.builder(newPrimary.indexSettings().getIndexMetaData()); + builder.settings(Settings.builder().put(newPrimary.indexSettings().getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") + ); + newPrimary.indexSettings().updateIndexMetaData(builder.build()); + newPrimary.onSettingsChanged(); + shards.syncGlobalCheckpoint(); + newPrimary.flush(new FlushRequest()); + uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); + } if (randomBoolean()) { + uncommittedOpsOnPrimary = 0; + shards.syncGlobalCheckpoint(); newPrimary.flush(new FlushRequest()); } @@ -269,7 +288,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs)); } else { assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs)); + assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedOpsOnPrimary)); } // roll back the extra ops in the replica diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 972a278ba5d45..cb8401baeb44c 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2144,7 +2144,7 @@ public void testShardActiveDuringInternalRecovery() throws IOException { shard.prepareForIndexRecovery(); // Shard is still inactive since we haven't started recovering yet assertFalse(shard.isActive()); - shard.openIndexAndTranslog(); + shard.openIndexAndRecoveryFromTranslog(); // Shard should now be active since we did recover: assertTrue(shard.isActive()); closeShards(shard); @@ -2172,13 +2172,20 @@ public void testShardActiveDuringPeerRecovery() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { - super.prepareForTranslogOperations(totalTranslogOps); + public void openFileBasedEngine(int totalTranslogOps) throws IOException { + super.openFileBasedEngine(totalTranslogOps); // Shard is still inactive since we haven't started recovering yet assertFalse(replica.isActive()); } + @Override + public void openSequencedBasedEngine(int totalTranslogOps) throws IOException { + super.openSequencedBasedEngine(totalTranslogOps); + // Shard is still inactive since we haven't started recovering yet + assertFalse(replica.isActive()); + } + @Override public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); @@ -2221,8 +2228,14 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { }) { // we're only checking that listeners are called when the engine is open, before there is no point @Override - public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { - super.prepareForTranslogOperations(totalTranslogOps); + public void openFileBasedEngine(int totalTranslogOps) throws IOException { + super.openFileBasedEngine(totalTranslogOps); + assertListenerCalled.accept(replica); + } + + @Override + public void openSequencedBasedEngine(int totalTranslogOps) throws IOException { + super.openSequencedBasedEngine(totalTranslogOps); assertListenerCalled.accept(replica); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index cf1449fecd6a5..b1ed09954a47f 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -585,7 +585,7 @@ public void testDisconnectsWhileRecovering() throws Exception { PeerRecoveryTargetService.Actions.FILE_CHUNK, PeerRecoveryTargetService.Actions.CLEAN_FILES, //RecoveryTarget.Actions.TRANSLOG_OPS, <-- may not be sent if already flushed - PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, + PeerRecoveryTargetService.Actions.OPEN_SEQUENCE_BASED_ENGINE, PeerRecoveryTargetService.Actions.FINALIZE }; final String recoveryActionToBlock = randomFrom(recoveryActions); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index f691cfd0238d4..ea26b2212cf93 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -19,103 +19,51 @@ package org.elasticsearch.indices.recovery; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.mapper.SourceToParse; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.index.translog.TranslogWriter; - -import java.io.IOException; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { public void testGetStartingSeqNo() throws Exception { - IndexShard replica = newShard(false); - final AtomicReference translogLocation = new AtomicReference<>(); - RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null) { - @Override - Path translogLocation() { - return translogLocation.get(); - } - }; + final IndexShard replica = newShard(false); try { - recoveryEmptyReplica(replica); - int docs = randomIntBetween(1, 10); - final String index = replica.shardId().getIndexName(); - long seqNo = 0; - for (int i = 0; i < docs; i++) { - replica.applyIndexOperationOnReplica(seqNo++, 1, VersionType.EXTERNAL, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - SourceToParse.source(index, "type", "doc_" + i, new BytesArray("{}"), XContentType.JSON), - update -> {}); - if (rarely()) { - // insert a gap - seqNo++; + // Empty store + { + recoveryEmptyReplica(replica); + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L)); + recoveryTarget.decRef(); + } + // Last commit is good - use it. + final long initDocs = scaledRandomIntBetween(1, 10); + { + for (int i = 0; i < initDocs; i++) { + indexDoc(replica, "doc", Integer.toString(i)); + replica.updateGlobalCheckpointOnReplica(i, "test"); } + flushShard(replica); + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs)); + recoveryTarget.decRef(); + } + // Last commit is not good - use the previous commit + { + int moreDocs = randomIntBetween(1, 10); + for (int i = 0; i < moreDocs; i++) { + indexDoc(replica, "doc", Long.toString(i)); + if (rarely()) { + getEngine(replica).getLocalCheckpointTracker().generateSeqNo(); // Create a gap in seqno. + } + } + flushShard(replica); + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs)); + recoveryTarget.decRef(); } - - final long maxSeqNo = replica.seqNoStats().getMaxSeqNo(); - final long localCheckpoint = replica.getLocalCheckpoint(); - - translogLocation.set(replica.getTranslog().location()); - - final Translog translog = replica.getTranslog(); - final String translogUUID = translog.getTranslogUUID(); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L)); - - translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo - 1)); - - // commit is good, global checkpoint is at least max *committed* which is NO_OPS_PERFORMED - assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L)); - - replica.flush(new FlushRequest()); - - translogLocation.set(replica.getTranslog().location()); - - // commit is not good, global checkpoint is below max - assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); - - translogLocation.set(writeTranslog(replica.shardId(), translogUUID, translog.currentFileGeneration(), maxSeqNo)); - - // commit is good, global checkpoint is above max - assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(localCheckpoint + 1)); } finally { closeShards(replica); - recoveryTarget.decRef(); } } - - private Path writeTranslog( - final ShardId shardId, - final String translogUUID, - final long generation, - final long globalCheckpoint - ) throws IOException { - final Path tempDir = createTempDir(); - final Path resolve = tempDir.resolve(Translog.getFilename(generation)); - Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME)); - try (TranslogWriter ignored = TranslogWriter.create( - shardId, - translogUUID, - generation, - resolve, - FileChannel::open, - TranslogConfig.DEFAULT_BUFFER_SIZE, generation, globalCheckpoint, () -> globalCheckpoint, () -> generation)) {} - return tempDir; - } - } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index cf5f24d2a6e18..4ff87c8aa90fa 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -423,7 +423,7 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO } @Override - void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { + void openEngineOnTarget(boolean sequencedBasedRecovery, int totalTranslogOps) throws IOException { prepareTargetForTranslogCalled.set(true); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 4a449463b5e8c..8c84e076642d7 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -19,10 +19,13 @@ package org.elasticsearch.indices.recovery; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.UUIDs; @@ -37,6 +40,8 @@ import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; @@ -241,4 +246,38 @@ public void testPeerRecoveryPersistGlobalCheckpoint() throws Exception { assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(numDocs - 1)); } } + + public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { + try (ReplicationGroup shards = createGroup(1)) { + shards.startAll(); + final IndexShard replica = shards.getReplicas().get(0); + final int goodDocs = scaledRandomIntBetween(0, 20); + int uncommittedDocs = 0; + for (int i = 0; i < goodDocs; i++) { + shards.indexDocs(1); + uncommittedDocs++; + if (randomBoolean()) { + shards.syncGlobalCheckpoint(); + shards.flush(); + uncommittedDocs = 0; + } + } + shards.removeReplica(replica); + final int moreDocs = shards.indexDocs(scaledRandomIntBetween(0, 20)); + if (randomBoolean()) { + shards.flush(); + } + replica.close("test", randomBoolean()); + replica.store().close(); + final IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); + shards.recoverReplica(newReplica); + + try (Translog.Snapshot snapshot = newReplica.getTranslog().newSnapshot()) { + assertThat("Sequence based recovery should keep existing translog", snapshot, SnapshotMatchers.size(goodDocs + moreDocs)); + } + assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedDocs + moreDocs)); + assertThat(newReplica.recoveryState().getIndex().fileDetails(), empty()); + } + } + } From 9d7db40ababfdd3d1c3cc800c18b5d935cf0c4e7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 11 Jan 2018 08:24:24 -0500 Subject: [PATCH 2/7] Use additional bool for sequence based recovery --- .../recovery/PeerRecoveryTargetService.java | 27 ++------ .../RecoveryOpenSeqBasedEngineRequest.java | 66 ------------------- ...yPrepareForTranslogOperationsRequest.java} | 42 ++++++++---- .../recovery/RecoverySourceHandler.java | 12 ++-- .../indices/recovery/RecoveryTarget.java | 14 ++-- .../recovery/RecoveryTargetHandler.java | 15 ++--- .../recovery/RemoteRecoveryTargetHandler.java | 19 +----- .../RecoveryDuringReplicationTests.java | 1 + .../index/shard/IndexShardTests.java | 21 ++---- .../indices/recovery/IndexRecoveryIT.java | 2 +- .../recovery/RecoverySourceHandlerTests.java | 2 +- .../indices/recovery/RecoveryTests.java | 4 -- 12 files changed, 61 insertions(+), 164 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenSeqBasedEngineRequest.java rename core/src/main/java/org/elasticsearch/indices/recovery/{RecoveryOpenFileBasedEngineRequest.java => RecoveryPrepareForTranslogOperationsRequest.java} (67%) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index ada2d4f8d4420..88b0f23d72a99 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -82,8 +82,7 @@ public static class Actions { public static final String FILE_CHUNK = "internal:index/shard/recovery/file_chunk"; public static final String CLEAN_FILES = "internal:index/shard/recovery/clean_files"; public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops"; - public static final String OPEN_FILE_BASED_ENGINE = "internal:index/shard/recovery/prepare_translog"; - public static final String OPEN_SEQUENCE_BASED_ENGINE = "internal:index/shard/recovery/open_seq_based_engine"; + public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog"; public static final String FINALIZE = "internal:index/shard/recovery/finalize"; public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate"; public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context"; @@ -113,10 +112,8 @@ public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, Trans FileChunkTransportRequestHandler()); transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new CleanFilesRequestHandler()); - transportService.registerRequestHandler(Actions.OPEN_FILE_BASED_ENGINE, ThreadPool.Names.GENERIC, - RecoveryOpenFileBasedEngineRequest::new, new OpenFileBasedEngineRequestHandler()); - transportService.registerRequestHandler(Actions.OPEN_SEQUENCE_BASED_ENGINE, ThreadPool.Names.GENERIC, - RecoveryOpenSeqBasedEngineRequest::new, new OpenSequenceBasedEngineRequestHandler()); + transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC, + RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler()); transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC, new TranslogOperationsRequestHandler()); transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new @@ -390,25 +387,13 @@ public interface RecoveryListener { void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure); } - class OpenFileBasedEngineRequestHandler implements TransportRequestHandler { + class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler { @Override - public void messageReceived(RecoveryOpenFileBasedEngineRequest request, TransportChannel channel) throws Exception { + public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - recoveryRef.target().openFileBasedEngine(request.totalTranslogOps()); - } - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } - - class OpenSequenceBasedEngineRequestHandler implements TransportRequestHandler { - - @Override - public void messageReceived(RecoveryOpenSeqBasedEngineRequest request, TransportChannel channel) throws Exception { - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() - )) { - recoveryRef.target().openSequencedBasedEngine(request.totalTranslogOps()); + recoveryRef.target().prepareForTranslogOperations(request.deleteLocalTranslog(), request.totalTranslogOps()); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenSeqBasedEngineRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenSeqBasedEngineRequest.java deleted file mode 100644 index 2a3276c9c3209..0000000000000 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenSeqBasedEngineRequest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.recovery; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.transport.TransportRequest; - -import java.io.IOException; - -final class RecoveryOpenSeqBasedEngineRequest extends TransportRequest { - final long recoveryId; - final ShardId shardId; - final int totalTranslogOps; - - RecoveryOpenSeqBasedEngineRequest(long recoveryId, ShardId shardId, int totalTranslogOps) { - this.recoveryId = recoveryId; - this.shardId = shardId; - this.totalTranslogOps = totalTranslogOps; - } - - RecoveryOpenSeqBasedEngineRequest(StreamInput in) throws IOException { - super(in); - recoveryId = in.readLong(); - shardId = ShardId.readShardId(in); - totalTranslogOps = in.readVInt(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeLong(recoveryId); - shardId.writeTo(out); - out.writeVInt(totalTranslogOps); - } - - public long recoveryId() { - return this.recoveryId; - } - - public ShardId shardId() { - return shardId; - } - - public int totalTranslogOps() { - return totalTranslogOps; - } -} diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenFileBasedEngineRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java similarity index 67% rename from core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenFileBasedEngineRequest.java rename to core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java index 6c5672bf110f9..ae8c7472f89b4 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryOpenFileBasedEngineRequest.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java @@ -28,25 +28,33 @@ import java.io.IOException; -final class RecoveryOpenFileBasedEngineRequest extends TransportRequest { - final long recoveryId; - final ShardId shardId; - final int totalTranslogOps; +class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { - RecoveryOpenFileBasedEngineRequest(StreamInput in) throws IOException { - super(in); + private final long recoveryId; + private final ShardId shardId; + private final int totalTranslogOps; + private final boolean deleteLocalTranslog; + + RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean deleteLocalTranslog) { + this.recoveryId = recoveryId; + this.shardId = shardId; + this.totalTranslogOps = totalTranslogOps; + this.deleteLocalTranslog = deleteLocalTranslog; + } + + RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException { + super.readFrom(in); recoveryId = in.readLong(); shardId = ShardId.readShardId(in); totalTranslogOps = in.readVInt(); if (in.getVersion().before(Version.V_6_0_0_alpha1)) { in.readLong(); // maxUnsafeAutoIdTimestamp } - } - - RecoveryOpenFileBasedEngineRequest(long recoveryId, ShardId shardId, int totalTranslogOps) { - this.recoveryId = recoveryId; - this.shardId = shardId; - this.totalTranslogOps = totalTranslogOps; + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + deleteLocalTranslog = in.readBoolean(); + } else { + deleteLocalTranslog = true; + } } public long recoveryId() { @@ -61,6 +69,13 @@ public int totalTranslogOps() { return totalTranslogOps; } + /** + * Whether or not the recover target should delete its local translog + */ + boolean deleteLocalTranslog() { + return deleteLocalTranslog; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -70,5 +85,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().before(Version.V_6_0_0_alpha1)) { out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp } + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeBoolean(deleteLocalTranslog); + } } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 937a72ea61556..df673569d0228 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -188,7 +188,9 @@ public RecoveryResponse recoverToTarget() throws IOException { runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); try { - openEngineOnTarget(isSequenceNumberBasedRecoveryPossible, translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(isSequenceNumberBasedRecoveryPossible == false, + translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } @@ -421,17 +423,13 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO } } - void openEngineOnTarget(final boolean sequencedBasedRecovery, final int totalTranslogOps) throws IOException { + void prepareTargetForTranslog(final boolean deleteLocalTranslog, final int totalTranslogOps) throws IOException { StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase1]: prepare remote engine for translog"); final long startEngineStart = stopWatch.totalTime().millis(); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. - if (sequencedBasedRecovery) { - cancellableThreads.executeIO(() -> recoveryTarget.openSequencedBasedEngine(totalTranslogOps)); - } else { - cancellableThreads.executeIO(() -> recoveryTarget.openFileBasedEngine(totalTranslogOps)); - } + cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(deleteLocalTranslog, totalTranslogOps)); stopWatch.stop(); response.startTime = stopWatch.totalTime().millis() - startEngineStart; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 34c54144e15a8..943146acc9e3c 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -362,15 +362,13 @@ private void ensureRefCount() { /*** Implementation of {@link RecoveryTargetHandler } */ @Override - public void openFileBasedEngine(int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); - indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO); - } - - @Override - public void openSequencedBasedEngine(int totalTranslogOps) throws IOException { - state().getTranslog().totalOperations(totalTranslogOps); - indexShard().openIndexAndSkipTranslogRecovery(); + if (deleteLocalTranslog) { + indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO); + } else { + indexShard().openIndexAndSkipTranslogRecovery(); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 6b5eea37c0ab3..2f1f62ec5ed71 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -29,21 +29,14 @@ public interface RecoveryTargetHandler { - /** - * Opens an engine and truncates the existing translog on the target. - * After this method, the target should be ready to receive translog operations. - * - * @param totalTranslogOps total translog operations expected to be sent - */ - void openFileBasedEngine(int totalTranslogOps) throws IOException; /** - * Opens an engine on the target with its own local commit point and keeps the existing translog on the target. - * After this method, the target should be ready to receive translog operations. + * Prepares the target to receive translog operations, after all file have been copied * - * @param totalTranslogOps total translog operations expected to be sent + * @param deleteLocalTranslog whether or not to delete the local translog on the target + * @param totalTranslogOps total translog operations expected to be sent */ - void openSequencedBasedEngine(int totalTranslogOps) throws IOException; + void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException; /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index e69f138f8fbf8..6214878e9a501 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -21,7 +21,6 @@ import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.seqno.GlobalCheckpointTracker; @@ -77,25 +76,13 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe } @Override - public void openFileBasedEngine(int totalTranslogOps) throws IOException { - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.OPEN_FILE_BASED_ENGINE, - new RecoveryOpenFileBasedEngineRequest(recoveryId, shardId, totalTranslogOps), + public void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException { + transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, + new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, deleteLocalTranslog), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } - @Override - public void openSequencedBasedEngine(int totalTranslogOps) throws IOException { - if (targetNode.getVersion().before(Version.V_7_0_0_alpha1)) { - openFileBasedEngine(totalTranslogOps); - } else { - transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.OPEN_SEQUENCE_BASED_ENGINE, - new RecoveryOpenSeqBasedEngineRequest(recoveryId, shardId, totalTranslogOps), - TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); - } - } - @Override public void finalizeRecovery(final long globalCheckpoint) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE, diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 3edad2635d1d0..fd9861299dd5a 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -242,6 +242,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { oldPrimary.flush(new FlushRequest(index.getName())); } } + shards.promoteReplicaToPrimary(newPrimary); // check that local checkpoint of new primary is properly tracked after primary promotion diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index cb8401baeb44c..92b6e87778443 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2172,20 +2172,13 @@ public void testShardActiveDuringPeerRecovery() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public void openFileBasedEngine(int totalTranslogOps) throws IOException { - super.openFileBasedEngine(totalTranslogOps); + public void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException { + super.prepareForTranslogOperations(deleteLocalTranslog, totalTranslogOps); // Shard is still inactive since we haven't started recovering yet assertFalse(replica.isActive()); } - @Override - public void openSequencedBasedEngine(int totalTranslogOps) throws IOException { - super.openSequencedBasedEngine(totalTranslogOps); - // Shard is still inactive since we haven't started recovering yet - assertFalse(replica.isActive()); - } - @Override public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); @@ -2228,14 +2221,8 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { }) { // we're only checking that listeners are called when the engine is open, before there is no point @Override - public void openFileBasedEngine(int totalTranslogOps) throws IOException { - super.openFileBasedEngine(totalTranslogOps); - assertListenerCalled.accept(replica); - } - - @Override - public void openSequencedBasedEngine(int totalTranslogOps) throws IOException { - super.openSequencedBasedEngine(totalTranslogOps); + public void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException { + super.prepareForTranslogOperations(deleteLocalTranslog, totalTranslogOps); assertListenerCalled.accept(replica); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index b1ed09954a47f..cf1449fecd6a5 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -585,7 +585,7 @@ public void testDisconnectsWhileRecovering() throws Exception { PeerRecoveryTargetService.Actions.FILE_CHUNK, PeerRecoveryTargetService.Actions.CLEAN_FILES, //RecoveryTarget.Actions.TRANSLOG_OPS, <-- may not be sent if already flushed - PeerRecoveryTargetService.Actions.OPEN_SEQUENCE_BASED_ENGINE, + PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, PeerRecoveryTargetService.Actions.FINALIZE }; final String recoveryActionToBlock = randomFrom(recoveryActions); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 4ff87c8aa90fa..e22b32305db0c 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -423,7 +423,7 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO } @Override - void openEngineOnTarget(boolean sequencedBasedRecovery, int totalTranslogOps) throws IOException { + void prepareTargetForTranslog(final boolean deleteLocalTranslog, final int totalTranslogOps) throws IOException { prepareTargetForTranslogCalled.set(true); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 8c84e076642d7..0f716cfc601e8 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -19,13 +19,10 @@ package org.elasticsearch.indices.recovery; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.UUIDs; @@ -40,7 +37,6 @@ import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; From c9152ea19398b5f44bbdeaa78e928eee9f5915d1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 11 Jan 2018 09:15:42 -0500 Subject: [PATCH 3/7] test: restore extra check --- .../PeerRecoveryTargetServiceTests.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index ea26b2212cf93..31521e33f21b6 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -41,20 +41,24 @@ public void testGetStartingSeqNo() throws Exception { { for (int i = 0; i < initDocs; i++) { indexDoc(replica, "doc", Integer.toString(i)); - replica.updateGlobalCheckpointOnReplica(i, "test"); + if (randomBoolean()) { + flushShard(replica); + } } flushShard(replica); + replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test"); + replica.getTranslog().sync(); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs)); recoveryTarget.decRef(); } - // Last commit is not good - use the previous commit + // Global checkpoint does not advance, last commit is not good - use the previous commit + final int moreDocs = randomIntBetween(1, 10); { - int moreDocs = randomIntBetween(1, 10); for (int i = 0; i < moreDocs; i++) { indexDoc(replica, "doc", Long.toString(i)); - if (rarely()) { - getEngine(replica).getLocalCheckpointTracker().generateSeqNo(); // Create a gap in seqno. + if (randomBoolean()) { + flushShard(replica); } } flushShard(replica); @@ -62,6 +66,14 @@ public void testGetStartingSeqNo() throws Exception { assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs)); recoveryTarget.decRef(); } + // Advances the global checkpoint, a safe commit also advances + { + replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test"); + replica.getTranslog().sync(); + final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs + moreDocs)); + recoveryTarget.decRef(); + } } finally { closeShards(replica); } From 2ff5354f9822373c9fd68f67e6d229be95c362b4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 11 Jan 2018 09:53:08 -0500 Subject: [PATCH 4/7] goodDocs -> initDocs --- .../org/elasticsearch/indices/recovery/RecoveryTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 0f716cfc601e8..45198a3de7047 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -247,9 +247,9 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { try (ReplicationGroup shards = createGroup(1)) { shards.startAll(); final IndexShard replica = shards.getReplicas().get(0); - final int goodDocs = scaledRandomIntBetween(0, 20); + final int initDocs = scaledRandomIntBetween(0, 20); int uncommittedDocs = 0; - for (int i = 0; i < goodDocs; i++) { + for (int i = 0; i < initDocs; i++) { shards.indexDocs(1); uncommittedDocs++; if (randomBoolean()) { @@ -269,7 +269,7 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { shards.recoverReplica(newReplica); try (Translog.Snapshot snapshot = newReplica.getTranslog().newSnapshot()) { - assertThat("Sequence based recovery should keep existing translog", snapshot, SnapshotMatchers.size(goodDocs + moreDocs)); + assertThat("Sequence based recovery should keep existing translog", snapshot, SnapshotMatchers.size(initDocs + moreDocs)); } assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(uncommittedDocs + moreDocs)); assertThat(newReplica.recoveryState().getIndex().fileDetails(), empty()); From 1eea7b1fee2f10459c0ec9d7f664a8e843512d66 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 11 Jan 2018 13:26:52 -0500 Subject: [PATCH 5/7] Rename createNewTranslog --- .../indices/recovery/RecoverySourceHandler.java | 11 +++++------ .../indices/recovery/RecoveryTarget.java | 4 ++-- .../indices/recovery/RecoveryTargetHandler.java | 5 ++--- .../indices/recovery/RemoteRecoveryTargetHandler.java | 4 ++-- .../elasticsearch/index/shard/IndexShardTests.java | 9 ++++----- .../indices/recovery/RecoverySourceHandlerTests.java | 2 +- 6 files changed, 16 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index e98ae031cb5ef..3ee9b953757c3 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -150,9 +150,9 @@ public RecoveryResponse recoverToTarget() throws IOException { final long startingSeqNo; final long requiredSeqNoRangeStart; - final boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && + final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && isTranslogReadyForSequenceNumberBasedRecovery(); - if (isSequenceNumberBasedRecoveryPossible) { + if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); requiredSeqNoRangeStart = startingSeqNo; @@ -189,8 +189,7 @@ public RecoveryResponse recoverToTarget() throws IOException { try { // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(isSequenceNumberBasedRecoveryPossible == false, - translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); + prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } @@ -423,13 +422,13 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO } } - void prepareTargetForTranslog(final boolean deleteLocalTranslog, final int totalTranslogOps) throws IOException { + void prepareTargetForTranslog(final boolean createNewTranslog, final int totalTranslogOps) throws IOException { StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase1]: prepare remote engine for translog"); final long startEngineStart = stopWatch.totalTime().millis(); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. - cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(deleteLocalTranslog, totalTranslogOps)); + cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(createNewTranslog, totalTranslogOps)); stopWatch.stop(); response.startTime = stopWatch.totalTime().millis() - startEngineStart; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 943146acc9e3c..9adadef13cef3 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -362,9 +362,9 @@ private void ensureRefCount() { /*** Implementation of {@link RecoveryTargetHandler } */ @Override - public void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); - if (deleteLocalTranslog) { + if (createNewTranslog) { indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO); } else { indexShard().openIndexAndSkipTranslogRecovery(); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 2f1f62ec5ed71..736d602044656 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -32,11 +32,10 @@ public interface RecoveryTargetHandler { /** * Prepares the target to receive translog operations, after all file have been copied - * - * @param deleteLocalTranslog whether or not to delete the local translog on the target + * @param createNewTranslog whether or not to delete the local translog on the target * @param totalTranslogOps total translog operations expected to be sent */ - void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException; + void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException; /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 6214878e9a501..4ea2be0e72659 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -76,9 +76,9 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe } @Override - public void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, deleteLocalTranslog), + new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, createNewTranslog), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 2756ed67c77c2..cd75c7a08fbc3 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -22,7 +22,6 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; @@ -2137,8 +2136,8 @@ public void testShardActiveDuringPeerRecovery() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException { - super.prepareForTranslogOperations(deleteLocalTranslog, totalTranslogOps); + public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException { + super.prepareForTranslogOperations(createNewTranslog, totalTranslogOps); // Shard is still inactive since we haven't started recovering yet assertFalse(replica.isActive()); @@ -2186,8 +2185,8 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { }) { // we're only checking that listeners are called when the engine is open, before there is no point @Override - public void prepareForTranslogOperations(boolean deleteLocalTranslog, int totalTranslogOps) throws IOException { - super.prepareForTranslogOperations(deleteLocalTranslog, totalTranslogOps); + public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException { + super.prepareForTranslogOperations(createNewTranslog, totalTranslogOps); assertListenerCalled.accept(replica); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index e0de37af5d64d..7ab6925ce57b9 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -423,7 +423,7 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO } @Override - void prepareTargetForTranslog(final boolean deleteLocalTranslog, final int totalTranslogOps) throws IOException { + void prepareTargetForTranslog(final boolean createNewTranslog, final int totalTranslogOps) throws IOException { prepareTargetForTranslogCalled.set(true); } From 9b25413a7364a6543b4148e5972ebac476c88404 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 11 Jan 2018 13:31:12 -0500 Subject: [PATCH 6/7] feedbacks --- .../org/elasticsearch/indices/recovery/RecoveryTarget.java | 1 + .../index/replication/RecoveryDuringReplicationTests.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 9adadef13cef3..fd26ebb13435e 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -365,6 +365,7 @@ private void ensureRefCount() { public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); if (createNewTranslog) { + // TODO: take the local checkpoint from store as global checkpoint, once we know it's safe indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO); } else { indexShard().openIndexAndSkipTranslogRecovery(); diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index fd9861299dd5a..881eb16d619d0 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -257,7 +257,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { // As a replica keeps a safe commit, the file-based recovery only happens if the required translog // for the sequence based recovery are not fully retained and extra documents were added to the primary. - boolean expectSeqNoRecovery = (moreDocs == 0 || frequently()); + boolean expectSeqNoRecovery = (moreDocs == 0 || randomBoolean()); int uncommittedOpsOnPrimary = 0; if (expectSeqNoRecovery == false) { IndexMetaData.Builder builder = IndexMetaData.builder(newPrimary.indexSettings().getIndexMetaData()); @@ -270,6 +270,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { shards.syncGlobalCheckpoint(); newPrimary.flush(new FlushRequest()); uncommittedOpsOnPrimary = shards.indexDocs(randomIntBetween(0, 10)); + totalDocs += uncommittedOpsOnPrimary; } if (randomBoolean()) { From 3ef8bf56947c8f5506ef552a35b7780d9a921e1d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 12 Jan 2018 16:43:51 -0500 Subject: [PATCH 7/7] update the todo note --- .../java/org/elasticsearch/indices/recovery/RecoveryTarget.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index fd26ebb13435e..1bbcb9efa9644 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -365,7 +365,7 @@ private void ensureRefCount() { public void prepareForTranslogOperations(boolean createNewTranslog, int totalTranslogOps) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); if (createNewTranslog) { - // TODO: take the local checkpoint from store as global checkpoint, once we know it's safe + // TODO: Assigns the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2 indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO); } else { indexShard().openIndexAndSkipTranslogRecovery();