Skip to content

Commit a4b4e14

Browse files
authored
Dedup translog operations by reading in reverse (#27268)
Currently, translog operations are read and processed one by one. This may be a problem as stale operations in translogs may suddenly reappear in recoveries. To make sure that stale operations won't be processed, we read the translog files in a reverse order (eg. from the most recent file to the oldest file) and only process an operation if its sequence number was not seen before. Relates to #10708
1 parent 0519fa2 commit a4b4e14

File tree

7 files changed

+454
-31
lines changed

7 files changed

+454
-31
lines changed

core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java

Lines changed: 97 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919

2020
package org.elasticsearch.index.translog;
2121

22+
import com.carrotsearch.hppc.LongHashSet;
23+
import com.carrotsearch.hppc.LongObjectHashMap;
24+
import com.carrotsearch.hppc.LongSet;
25+
import org.apache.lucene.util.FixedBitSet;
26+
import org.elasticsearch.index.seqno.SequenceNumbers;
27+
2228
import java.io.Closeable;
2329
import java.io.IOException;
2430
import java.util.Arrays;
@@ -30,32 +36,44 @@ final class MultiSnapshot implements Translog.Snapshot {
3036

3137
private final TranslogSnapshot[] translogs;
3238
private final int totalOperations;
39+
private int overriddenOperations;
3340
private final Closeable onClose;
3441
private int index;
42+
private final SeqNoSet seenSeqNo;
3543

3644
/**
3745
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
3846
*/
3947
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
4048
this.translogs = translogs;
41-
totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
49+
this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
50+
this.overriddenOperations = 0;
4251
this.onClose = onClose;
43-
index = 0;
52+
this.seenSeqNo = new SeqNoSet();
53+
this.index = translogs.length - 1;
4454
}
4555

46-
4756
@Override
4857
public int totalOperations() {
4958
return totalOperations;
5059
}
5160

61+
@Override
62+
public int overriddenOperations() {
63+
return overriddenOperations;
64+
}
65+
5266
@Override
5367
public Translog.Operation next() throws IOException {
54-
for (; index < translogs.length; index++) {
68+
for (; index >= 0; index--) {
5569
final TranslogSnapshot current = translogs[index];
56-
Translog.Operation op = current.next();
57-
if (op != null) { // if we are null we move to the next snapshot
58-
return op;
70+
Translog.Operation op;
71+
while ((op = current.next()) != null) {
72+
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
73+
return op;
74+
} else {
75+
overriddenOperations++;
76+
}
5977
}
6078
}
6179
return null;
@@ -65,4 +83,76 @@ public Translog.Operation next() throws IOException {
6583
public void close() throws IOException {
6684
onClose.close();
6785
}
86+
87+
/**
88+
* A wrapper of {@link FixedBitSet} but allows to check if all bits are set in O(1).
89+
*/
90+
private static final class CountedBitSet {
91+
private short onBits;
92+
private final FixedBitSet bitset;
93+
94+
CountedBitSet(short numBits) {
95+
assert numBits > 0;
96+
this.onBits = 0;
97+
this.bitset = new FixedBitSet(numBits);
98+
}
99+
100+
boolean getAndSet(int index) {
101+
assert index >= 0;
102+
boolean wasOn = bitset.getAndSet(index);
103+
if (wasOn == false) {
104+
onBits++;
105+
}
106+
return wasOn;
107+
}
108+
109+
boolean hasAllBitsOn() {
110+
return onBits == bitset.length();
111+
}
112+
}
113+
114+
/**
115+
* Sequence numbers from translog are likely to form contiguous ranges,
116+
* thus collapsing a completed bitset into a single entry will reduce memory usage.
117+
*/
118+
static final class SeqNoSet {
119+
static final short BIT_SET_SIZE = 1024;
120+
private final LongSet completedSets = new LongHashSet();
121+
private final LongObjectHashMap<CountedBitSet> ongoingSets = new LongObjectHashMap<>();
122+
123+
/**
124+
* Marks this sequence number and returns <tt>true</tt> if it is seen before.
125+
*/
126+
boolean getAndSet(long value) {
127+
assert value >= 0;
128+
final long key = value / BIT_SET_SIZE;
129+
130+
if (completedSets.contains(key)) {
131+
return true;
132+
}
133+
134+
CountedBitSet bitset = ongoingSets.get(key);
135+
if (bitset == null) {
136+
bitset = new CountedBitSet(BIT_SET_SIZE);
137+
ongoingSets.put(key, bitset);
138+
}
139+
140+
final boolean wasOn = bitset.getAndSet(Math.toIntExact(value % BIT_SET_SIZE));
141+
if (bitset.hasAllBitsOn()) {
142+
ongoingSets.remove(key);
143+
completedSets.add(key);
144+
}
145+
return wasOn;
146+
}
147+
148+
// For testing
149+
long completeSetsSize() {
150+
return completedSets.size();
151+
}
152+
153+
// For testing
154+
long ongoingSetsSize() {
155+
return ongoingSets.size();
156+
}
157+
}
68158
}

core/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -831,10 +831,19 @@ public int hashCode() {
831831
public interface Snapshot extends Closeable {
832832

833833
/**
834-
* The total number of operations in the translog.
834+
* The total estimated number of operations in the snapshot.
835835
*/
836836
int totalOperations();
837837

838+
/**
839+
* The number of operations have been overridden (eg. superseded) in the snapshot so far.
840+
* If two operations have the same sequence number, the operation with a lower term will be overridden by the operation
841+
* with a higher term. Unlike {@link #totalOperations()}, this value is updated each time after {@link #next()}) is called.
842+
*/
843+
default int overriddenOperations() {
844+
return 0;
845+
}
846+
838847
/**
839848
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
840849
*/

core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import java.util.ArrayList;
6767
import java.util.Comparator;
6868
import java.util.List;
69+
import java.util.Locale;
6970
import java.util.concurrent.atomic.AtomicLong;
7071
import java.util.function.Function;
7172
import java.util.function.Supplier;
@@ -567,8 +568,9 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, final Transl
567568
cancellableThreads.executeIO(sendBatch);
568569
}
569570

570-
assert expectedTotalOps == skippedOps + totalSentOps
571-
: "expected total [" + expectedTotalOps + "], skipped [" + skippedOps + "], total sent [" + totalSentOps + "]";
571+
assert expectedTotalOps == snapshot.overriddenOperations() + skippedOps + totalSentOps
572+
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
573+
expectedTotalOps, snapshot.overriddenOperations(), skippedOps, totalSentOps);
572574

573575
logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
574576

core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,23 @@
4646
import org.hamcrest.Matcher;
4747

4848
import java.io.IOException;
49+
import java.util.ArrayList;
4950
import java.util.Collections;
5051
import java.util.List;
5152
import java.util.Map;
5253
import java.util.concurrent.CountDownLatch;
5354
import java.util.concurrent.Future;
5455

56+
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
5557
import static org.hamcrest.Matchers.anyOf;
5658
import static org.hamcrest.Matchers.containsString;
5759
import static org.hamcrest.Matchers.equalTo;
60+
import static org.hamcrest.Matchers.greaterThan;
61+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5862
import static org.hamcrest.Matchers.instanceOf;
63+
import static org.hamcrest.Matchers.notNullValue;
64+
import static org.hamcrest.Matchers.nullValue;
65+
import static org.hamcrest.core.Is.is;
5966

6067
public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase {
6168

@@ -299,6 +306,68 @@ public void testRequestFailureReplication() throws Exception {
299306
}
300307
}
301308

309+
public void testSeqNoCollision() throws Exception {
310+
try (ReplicationGroup shards = createGroup(2)) {
311+
shards.startAll();
312+
int initDocs = shards.indexDocs(randomInt(10));
313+
List<IndexShard> replicas = shards.getReplicas();
314+
IndexShard replica1 = replicas.get(0);
315+
IndexShard replica2 = replicas.get(1);
316+
shards.syncGlobalCheckpoint();
317+
318+
logger.info("--> Isolate replica1");
319+
IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON);
320+
BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary());
321+
indexOnReplica(replicationRequest, replica2);
322+
323+
final Translog.Operation op1;
324+
final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);
325+
try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) {
326+
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
327+
for (int i = 0; i < initDocs; i++) {
328+
Translog.Operation op = snapshot.next();
329+
assertThat(op, is(notNullValue()));
330+
initOperations.add(op);
331+
}
332+
op1 = snapshot.next();
333+
assertThat(op1, notNullValue());
334+
assertThat(snapshot.next(), nullValue());
335+
assertThat(snapshot.overriddenOperations(), equalTo(0));
336+
}
337+
// Make sure that replica2 receives translog ops (eg. op2) from replica1 and overwrites its stale operation (op1).
338+
logger.info("--> Promote replica1 as the primary");
339+
shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed.
340+
shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON));
341+
final Translog.Operation op2;
342+
try (Translog.Snapshot snapshot = replica2.getTranslog().newSnapshot()) {
343+
assertThat(snapshot.totalOperations(), equalTo(initDocs + 2));
344+
op2 = snapshot.next();
345+
assertThat(op2.seqNo(), equalTo(op1.seqNo()));
346+
assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm()));
347+
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
348+
assertThat(snapshot.overriddenOperations(), equalTo(1));
349+
}
350+
351+
// Make sure that peer-recovery transfers all but non-overridden operations.
352+
IndexShard replica3 = shards.addReplica();
353+
logger.info("--> Promote replica2 as the primary");
354+
shards.promoteReplicaToPrimary(replica2);
355+
logger.info("--> Recover replica3 from replica2");
356+
recoverReplica(replica3, replica2);
357+
try (Translog.Snapshot snapshot = replica3.getTranslog().newSnapshot()) {
358+
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
359+
assertThat(snapshot.next(), equalTo(op2));
360+
assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
361+
assertThat("Peer-recovery should not send overridden operations", snapshot.overriddenOperations(), equalTo(0));
362+
}
363+
// TODO: We should assert the content of shards in the ReplicationGroup.
364+
// Without rollback replicas(current implementation), we don't have the same content across shards:
365+
// - replica1 has {doc1}
366+
// - replica2 has {doc1, doc2}
367+
// - replica3 can have either {doc2} only if operation-based recovery or {doc1, doc2} if file-based recovery
368+
}
369+
}
370+
302371
/** Throws <code>documentFailure</code> on every indexing operation */
303372
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
304373
final String documentFailureMessage;
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.translog;
21+
22+
import com.carrotsearch.hppc.LongHashSet;
23+
import com.carrotsearch.hppc.LongSet;
24+
import org.elasticsearch.common.Randomness;
25+
import org.elasticsearch.test.ESTestCase;
26+
27+
import java.util.List;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.IntStream;
30+
import java.util.stream.LongStream;
31+
32+
import static org.hamcrest.CoreMatchers.equalTo;
33+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
34+
35+
public class MultiSnapshotTests extends ESTestCase {
36+
37+
public void testTrackSeqNoSimpleRange() throws Exception {
38+
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
39+
final List<Long> values = LongStream.range(0, 1024).boxed().collect(Collectors.toList());
40+
Randomness.shuffle(values);
41+
for (int i = 0; i < 1023; i++) {
42+
assertThat(bitSet.getAndSet(values.get(i)), equalTo(false));
43+
assertThat(bitSet.ongoingSetsSize(), equalTo(1L));
44+
assertThat(bitSet.completeSetsSize(), equalTo(0L));
45+
}
46+
47+
assertThat(bitSet.getAndSet(values.get(1023)), equalTo(false));
48+
assertThat(bitSet.ongoingSetsSize(), equalTo(0L));
49+
assertThat(bitSet.completeSetsSize(), equalTo(1L));
50+
51+
assertThat(bitSet.getAndSet(between(0, 1023)), equalTo(true));
52+
assertThat(bitSet.getAndSet(between(1024, Integer.MAX_VALUE)), equalTo(false));
53+
}
54+
55+
public void testTrackSeqNoDenseRanges() throws Exception {
56+
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
57+
final LongSet normalSet = new LongHashSet();
58+
IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> {
59+
long seq = between(0, 5000);
60+
boolean existed = normalSet.add(seq) == false;
61+
assertThat("SeqNoSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed));
62+
assertThat(bitSet.ongoingSetsSize() + bitSet.completeSetsSize(), lessThanOrEqualTo(5L));
63+
});
64+
}
65+
66+
public void testTrackSeqNoSparseRanges() throws Exception {
67+
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
68+
final LongSet normalSet = new LongHashSet();
69+
IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> {
70+
long seq = between(i * 10_000, i * 30_000);
71+
boolean existed = normalSet.add(seq) == false;
72+
assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed));
73+
});
74+
}
75+
76+
public void testTrackSeqNoMimicTranslogRanges() throws Exception {
77+
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
78+
final LongSet normalSet = new LongHashSet();
79+
long currentSeq = between(10_000_000, 1_000_000_000);
80+
final int iterations = scaledRandomIntBetween(100, 2000);
81+
assertThat(bitSet.completeSetsSize(), equalTo(0L));
82+
assertThat(bitSet.ongoingSetsSize(), equalTo(0L));
83+
long totalDocs = 0;
84+
for (long i = 0; i < iterations; i++) {
85+
int batchSize = between(1, 1500);
86+
totalDocs += batchSize;
87+
currentSeq -= batchSize;
88+
List<Long> batch = LongStream.range(currentSeq, currentSeq + batchSize)
89+
.boxed()
90+
.collect(Collectors.toList());
91+
Randomness.shuffle(batch);
92+
batch.forEach(seq -> {
93+
boolean existed = normalSet.add(seq) == false;
94+
assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed));
95+
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(4L));
96+
});
97+
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L));
98+
}
99+
assertThat(bitSet.completeSetsSize(), lessThanOrEqualTo(totalDocs / 1024));
100+
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L));
101+
}
102+
}

0 commit comments

Comments
 (0)