Skip to content

Replica starts peer recovery with safe commit #28181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 13, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,11 @@ public interface Warmer {
*/
public abstract Engine recoverFromTranslog() throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Returns <code>true</code> iff this engine is currently recovering from translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,15 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

@Override
public void skipTranslogRecovery() {
if (openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
throw new IllegalStateException("Can't skip translog recovery with open mode: " + openMode);
}
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1303,9 +1303,20 @@ public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalC
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openIndexAndTranslog() throws IOException {
public void openIndexAndRecoveryFromTranslog() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
getEngine().recoverFromTranslog();
}

/**
* Opens the engine on top of the existing lucene engine and translog.
* The translog is kept but its operations won't be replayed.
*/
public void openIndexAndSkipTranslogRecovery() throws IOException {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER;
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, false);
getEngine().skipTranslogRecovery();
}

private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, final boolean forceNewHistoryUUID) throws IOException {
Expand Down Expand Up @@ -1338,13 +1349,12 @@ private void innerOpenEngineAndTranslog(final EngineConfig.OpenMode openMode, fi
globalCheckpointTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()),
"read from translog checkpoint");
}
Engine newEngine = createNewEngine(config);
createNewEngine(config);
verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
newEngine.recoverFromTranslog();
}
assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
logger.debug("failed to list file details", e);
}
if (indexShouldExists) {
indexShard.openIndexAndTranslog();
indexShard.openIndexAndRecoveryFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
} else {
indexShard.createIndexAndTranslog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -39,6 +41,7 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand All @@ -60,6 +63,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -78,7 +82,8 @@ public static class Actions {
public static final String FILE_CHUNK = "internal:index/shard/recovery/file_chunk";
public static final String CLEAN_FILES = "internal:index/shard/recovery/clean_files";
public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String OPEN_FILE_BASED_ENGINE = "internal:index/shard/recovery/prepare_translog";
public static final String OPEN_SEQUENCE_BASED_ENGINE = "internal:index/shard/recovery/open_seq_based_engine";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public static final String WAIT_CLUSTERSTATE = "internal:index/shard/recovery/wait_clusterstate";
public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context";
Expand Down Expand Up @@ -108,8 +113,10 @@ public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, Trans
FileChunkTransportRequestHandler());
transportService.registerRequestHandler(Actions.CLEAN_FILES, RecoveryCleanFilesRequest::new, ThreadPool.Names.GENERIC, new
CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, RecoveryPrepareForTranslogOperationsRequest::new, ThreadPool
.Names.GENERIC, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.OPEN_FILE_BASED_ENGINE, ThreadPool.Names.GENERIC,
RecoveryOpenFileBasedEngineRequest::new, new OpenFileBasedEngineRequestHandler());
transportService.registerRequestHandler(Actions.OPEN_SEQUENCE_BASED_ENGINE, ThreadPool.Names.GENERIC,
RecoveryOpenSeqBasedEngineRequest::new, new OpenSequenceBasedEngineRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
Expand Down Expand Up @@ -353,7 +360,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit);
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*
Expand Down Expand Up @@ -381,13 +390,25 @@ public interface RecoveryListener {
void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure);
}

class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
class OpenFileBasedEngineRequestHandler implements TransportRequestHandler<RecoveryOpenFileBasedEngineRequest> {

@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
public void messageReceived(RecoveryOpenFileBasedEngineRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps());
recoveryRef.target().openFileBasedEngine(request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

class OpenSequenceBasedEngineRequestHandler implements TransportRequestHandler<RecoveryOpenSeqBasedEngineRequest> {

@Override
public void messageReceived(RecoveryOpenSeqBasedEngineRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().openSequencedBasedEngine(request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,22 @@

import java.io.IOException;

public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
final class RecoveryOpenFileBasedEngineRequest extends TransportRequest {
final long recoveryId;
final ShardId shardId;
final int totalTranslogOps;

private long recoveryId;
private ShardId shardId;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;

public RecoveryPrepareForTranslogOperationsRequest() {
RecoveryOpenFileBasedEngineRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
}

RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
RecoveryOpenFileBasedEngineRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
Expand All @@ -55,17 +61,6 @@ public int totalTranslogOps() {
return totalTranslogOps;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.recovery;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;

final class RecoveryOpenSeqBasedEngineRequest extends TransportRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've slept on this (as promised :)) and I prefer we go back to how you had it with a boolean in RecoveryPrepareForTranslogOperationsRequest. The reason is that I want to do some refactoring to simplify how the engine is created and I expect this to change making the boolean not needed and only use one message. I rather not have to deal with two messages and another layer of BWC. I think we should call the boolean "deleteLocalTranslog"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on not having a separate message here.

final long recoveryId;
final ShardId shardId;
final int totalTranslogOps;

RecoveryOpenSeqBasedEngineRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
}

RecoveryOpenSeqBasedEngineRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeVInt(totalTranslogOps);
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

public int totalTranslogOps() {
return totalTranslogOps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));

try {
prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can roll back all these naming changes if we we keep the old message (and the boolean)

openEngineOnTarget(isSequenceNumberBasedRecoveryPossible, translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
Expand Down Expand Up @@ -421,13 +421,17 @@ public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogO
}
}

void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
void openEngineOnTarget(final boolean sequencedBasedRecovery, final int totalTranslogOps) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final long startEngineStart = stopWatch.totalTime().millis();
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
if (sequencedBasedRecovery) {
cancellableThreads.executeIO(() -> recoveryTarget.openSequencedBasedEngine(totalTranslogOps));
} else {
cancellableThreads.executeIO(() -> recoveryTarget.openFileBasedEngine(totalTranslogOps));
}
stopWatch.stop();

response.startTime = stopWatch.totalTime().millis() - startEngineStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,17 @@ private void ensureRefCount() {
/*** Implementation of {@link RecoveryTargetHandler } */

@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
public void openFileBasedEngine(int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
// TODO: take the local checkpoint from store as global checkpoint, once we know it's safe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the todo is still relevant no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed it back as a note as it's a valid TODO.

indexShard().openIndexAndCreateTranslog(false, SequenceNumbers.UNASSIGNED_SEQ_NO);
}

@Override
public void openSequencedBasedEngine(int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openIndexAndSkipTranslogRecovery();
}

@Override
public void finalizeRecovery(final long globalCheckpoint) throws IOException {
final IndexShard indexShard = indexShard();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,21 @@


public interface RecoveryTargetHandler {
/**
* Opens an engine and truncates the existing translog on the target.
* After this method, the target should be ready to receive translog operations.
*
* @param totalTranslogOps total translog operations expected to be sent
*/
void openFileBasedEngine(int totalTranslogOps) throws IOException;

/**
* Prepares the target to receive translog operations, after all file have been copied
* Opens an engine on the target with its own local commit point and keeps the existing translog on the target.
* After this method, the target should be ready to receive translog operations.
*
* @param totalTranslogOps total translog operations expected to be sent
*/
void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
void openSequencedBasedEngine(int totalTranslogOps) throws IOException;

/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
Expand Down Expand Up @@ -76,13 +77,25 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
}

@Override
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
public void openFileBasedEngine(int totalTranslogOps) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.OPEN_FILE_BASED_ENGINE,
new RecoveryOpenFileBasedEngineRequest(recoveryId, shardId, totalTranslogOps),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}

@Override
public void openSequencedBasedEngine(int totalTranslogOps) throws IOException {
if (targetNode.getVersion().before(Version.V_7_0_0_alpha1)) {
openFileBasedEngine(totalTranslogOps);
} else {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.OPEN_SEQUENCE_BASED_ENGINE,
new RecoveryOpenSeqBasedEngineRequest(recoveryId, shardId, totalTranslogOps),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
}

@Override
public void finalizeRecovery(final long globalCheckpoint) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
Expand Down
Loading