Skip to content

Trim local translog in peer recovery #44756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 3, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,7 @@ public void trimTranslog() {
/**
* Rolls the tranlog generation and cleans unneeded.
*/
private void rollTranslogGeneration() {
public void rollTranslogGeneration() {
final Engine engine = getEngine();
engine.rollTranslogGeneration();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public int skippedOperations() {

@Override
public Translog.Operation next() throws IOException {
// TODO: Read translog forward in 9.0+
for (; index >= 0; index--) {
final TranslogSnapshot current = translogs[index];
Translog.Operation op;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request);
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(),
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(),
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,39 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;

public class RecoveryFinalizeRecoveryRequest extends TransportRequest {
final class RecoveryFinalizeRecoveryRequest extends TransportRequest {

private long recoveryId;
private ShardId shardId;
private long globalCheckpoint;
private final long recoveryId;
private final ShardId shardId;
private final long globalCheckpoint;
private final long trimAboveSeqNo;

public RecoveryFinalizeRecoveryRequest(StreamInput in) throws IOException {
RecoveryFinalizeRecoveryRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
globalCheckpoint = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
trimAboveSeqNo = in.readZLong();
} else {
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint) {
RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint, final long trimAboveSeqNo) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.globalCheckpoint = globalCheckpoint;
this.trimAboveSeqNo = trimAboveSeqNo;
}

public long recoveryId() {
Expand All @@ -57,12 +66,19 @@ public long globalCheckpoint() {
return globalCheckpoint;
}

public long trimAboveSeqNo() {
return trimAboveSeqNo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeZLong(globalCheckpoint);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeZLong(trimAboveSeqNo);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ && isTargetSameHistory()

}, onFailure);

sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
// Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2
final long trimAboveSeqNo = startingSeqNo - 1;
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);

finalizeStep.whenComplete(r -> {
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
Expand Down Expand Up @@ -747,7 +749,7 @@ private void sendBatch(
}
}

void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException {
void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
Expand All @@ -764,7 +766,7 @@ void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Voi
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, finalizeListener));
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener));
finalizeListener.whenComplete(r -> {
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,24 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
}

@Override
public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> listener) {
public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
final IndexShard indexShard = indexShard();
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
// Persist the global checkpoint.
indexShard.sync();
indexShard.persistRetentionLeases();
if (hasUncommittedOperations()) {
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
}
if (trimAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// We should erase all translog operations above trimAboveSeqNo as we have received either the same or a newer copy
// from the recovery source in phase2. Rolling a new translog generation is not strictly required here for we won't
// trim the current generation. It's merely to satisfy the assumption that the current generation does not have any
// operation that would be trimmed (see TranslogWriter#assertNoSeqAbove). This assumption does not hold for peer
// recovery because we could have received operations above startingSeqNo from the previous primary terms.
indexShard.rollTranslogGeneration();
indexShard.trimOperationOfPreviousPrimaryTerms(trimAboveSeqNo);
}
indexShard.finalizeRecovery();
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ public interface RecoveryTargetHandler {
* the global checkpoint.
*
* @param globalCheckpoint the global checkpoint on the recovery source
* @param trimAboveSeqNo The recovery target should erase its existing translog above this sequence number
* from the previous primary terms.
* @param listener the listener which will be notified when this method is completed
*/
void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener);
void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener);

/**
* Handoff the primary context between the relocation source and the relocation target.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
}

@Override
public void finalizeRecovery(final long globalCheckpoint, final ActionListener<Void> listener) {
public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, final ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint),
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint, trimAboveSeqNo),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
Expand Down Expand Up @@ -184,6 +185,7 @@
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
Expand Down Expand Up @@ -5946,4 +5948,34 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
assertNotNull(engine.failedEngine.get());
}
}

/**
* We can trim translog on the primary promotion and peer recovery based on the fact we add operations with either
* REPLICA or PEER_RECOVERY origin to translog although they already exist in the engine (i.e. hasProcessed() == true).
* If we decide not to add those already-processed operations to translog, we need to study carefully the consequence
* of the translog trimming in these two places.
*/
public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws Exception {
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
applyOperations(engine, operations);
Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos));
}
primaryTerm.set(randomLongBetween(primaryTerm.get(), Long.MAX_VALUE));
engine.rollTranslogGeneration();
engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED);
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertNull(snapshot.next());
}
applyOperations(engine, operations);
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size() * 2));
assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,9 @@ protected EngineFactory getEngineFactory(ShardRouting routing) {
recoveryStart.countDown();
return new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
recoveryDone.set(true);
super.finalizeRecovery(globalCheckpoint, listener);
super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener);
}
};
});
Expand Down Expand Up @@ -868,13 +868,13 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada
}

@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
if (hasBlocked() == false) {
// it maybe that not ops have been transferred, block now
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
blockIfNeeded(RecoveryState.Stage.FINALIZE);
super.finalizeRecovery(globalCheckpoint, listener);
super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ public void onFailure(Exception e) {
onFailureException.get(), hasToString(containsString("operation primary term [" + oldPrimaryTerm + "] is too old")));
}

closeShards(indexShard);
closeShard(indexShard, false); // skip asserting translog and Lucene as we rolled back Lucene but did not execute resync
}

public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception {
Expand Down Expand Up @@ -2760,8 +2760,8 @@ public void indexTranslogOperations(
}

@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint,
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo,
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.indices.recovery;

import org.apache.lucene.analysis.TokenStream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
Expand Down Expand Up @@ -75,6 +77,7 @@
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.flush.SyncedFlushUtil;
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -117,6 +120,8 @@

import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand All @@ -125,6 +130,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

Expand Down Expand Up @@ -1326,4 +1332,55 @@ public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception {
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
}
}
public void testPeerRecoveryTrimsLocalTranslog() throws Exception {
internalCluster().startNode();
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
String indexName = "test-index";
createIndex(indexName, Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.put("index.routing.allocation.include._name", String.join(",", dataNodes)).build());
ensureGreen(indexName);
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
DiscoveryNode nodeWithOldPrimary = clusterState.nodes().get(clusterState.routingTable()
.index(indexName).shard(0).primaryShard().currentNodeId());
MockTransportService transportService = (MockTransportService) internalCluster()
.getInstance(TransportService.class, nodeWithOldPrimary.getName());
CountDownLatch readyToRestartNode = new CountDownLatch(1);
AtomicBoolean stopped = new AtomicBoolean();
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals("indices:data/write/bulk[s][r]") && randomInt(100) < 5) {
throw new NodeClosedException(nodeWithOldPrimary);
}
// prevent the primary from marking the replica as stale so the replica can get promoted.
if (action.equals("internal:cluster/shard/failure")) {
stopped.set(true);
readyToRestartNode.countDown();
throw new NodeClosedException(nodeWithOldPrimary);
}
connection.sendRequest(requestId, action, request, options);
});
Thread[] indexers = new Thread[randomIntBetween(1, 8)];
for (int i = 0; i < indexers.length; i++) {
indexers[i] = new Thread(() -> {
while (stopped.get() == false) {
try {
IndexResponse response = client().prepareIndex(indexName, "_doc")
.setSource(Map.of("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON).get();
assertThat(response.getResult(), isOneOf(CREATED, UPDATED));
} catch (ElasticsearchException ignored) {
}
}
});
}
for (Thread indexer : indexers) {
indexer.start();
}
readyToRestartNode.await();
transportService.clearAllRules();
internalCluster().restartNode(nodeWithOldPrimary.getName(), new InternalTestCluster.RestartCallback());
for (Thread indexer : indexers) {
indexer.join();
}
ensureGreen(indexName);
}
}
Loading