Skip to content

Commit 85db659

Browse files
committed
two versions
1 parent bbdebeb commit 85db659

File tree

4 files changed

+141
-46
lines changed

4 files changed

+141
-46
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.translog;
21+
22+
import com.carrotsearch.hppc.LongObjectHashMap;
23+
import org.elasticsearch.index.seqno.CountedBitSet;
24+
import org.elasticsearch.index.seqno.SequenceNumbers;
25+
26+
import java.io.Closeable;
27+
import java.io.IOException;
28+
import java.util.Arrays;
29+
30+
/**
31+
* A snapshot composed out of multiple snapshots
32+
*/
33+
final class BackwardMultiSnapshot implements Translog.Snapshot {
34+
35+
private final TranslogSnapshot[] translogs;
36+
private final int totalOperations;
37+
private int overriddenOperations;
38+
private final Closeable onClose;
39+
private int index;
40+
private final SeqNoSet seenSeqNo;
41+
42+
/**
43+
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
44+
*/
45+
BackwardMultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
46+
this.translogs = translogs;
47+
this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
48+
this.overriddenOperations = 0;
49+
this.onClose = onClose;
50+
this.seenSeqNo = new SeqNoSet();
51+
this.index = translogs.length - 1;
52+
}
53+
54+
@Override
55+
public int totalOperations() {
56+
return totalOperations;
57+
}
58+
59+
@Override
60+
public int skippedOperations() {
61+
return Arrays.stream(translogs).mapToInt(TranslogSnapshot::skippedOperations).sum() + overriddenOperations;
62+
}
63+
64+
@Override
65+
public Translog.Operation next() throws IOException {
66+
// TODO: Read translog forward in 9.0+
67+
for (; index >= 0; index--) {
68+
final TranslogSnapshot current = translogs[index];
69+
Translog.Operation op;
70+
while ((op = current.next()) != null) {
71+
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
72+
return op;
73+
} else {
74+
overriddenOperations++;
75+
}
76+
}
77+
}
78+
return null;
79+
}
80+
81+
@Override
82+
public void close() throws IOException {
83+
onClose.close();
84+
}
85+
86+
static final class SeqNoSet {
87+
static final short BIT_SET_SIZE = 1024;
88+
private final LongObjectHashMap<CountedBitSet> bitSets = new LongObjectHashMap<>();
89+
90+
/**
91+
* Marks this sequence number and returns {@code true} if it is seen before.
92+
*/
93+
boolean getAndSet(long value) {
94+
assert value >= 0;
95+
final long key = value / BIT_SET_SIZE;
96+
CountedBitSet bitset = bitSets.get(key);
97+
if (bitset == null) {
98+
bitset = new CountedBitSet(BIT_SET_SIZE);
99+
bitSets.put(key, bitset);
100+
}
101+
final int index = Math.toIntExact(value % BIT_SET_SIZE);
102+
final boolean wasOn = bitset.get(index);
103+
bitset.set(index);
104+
return wasOn;
105+
}
106+
}
107+
}

server/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919

2020
package org.elasticsearch.index.translog;
2121

22-
import com.carrotsearch.hppc.LongObjectHashMap;
23-
import org.elasticsearch.index.seqno.CountedBitSet;
24-
import org.elasticsearch.index.seqno.SequenceNumbers;
22+
import com.carrotsearch.hppc.LongLongHashMap;
23+
import org.elasticsearch.Assertions;
2524

2625
import java.io.Closeable;
2726
import java.io.IOException;
@@ -31,24 +30,20 @@
3130
* A snapshot composed out of multiple snapshots
3231
*/
3332
final class MultiSnapshot implements Translog.Snapshot {
34-
3533
private final TranslogSnapshot[] translogs;
3634
private final int totalOperations;
37-
private int overriddenOperations;
3835
private final Closeable onClose;
39-
private int index;
40-
private final SeqNoSet seenSeqNo;
36+
private int index = 0;
37+
private final LongLongHashMap seqNoToTerm; // for assertion purpose
4138

4239
/**
4340
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
4441
*/
4542
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
4643
this.translogs = translogs;
4744
this.totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
48-
this.overriddenOperations = 0;
4945
this.onClose = onClose;
50-
this.seenSeqNo = new SeqNoSet();
51-
this.index = translogs.length - 1;
46+
this.seqNoToTerm = Assertions.ENABLED ? new LongLongHashMap() : null;
5247
}
5348

5449
@Override
@@ -58,50 +53,31 @@ public int totalOperations() {
5853

5954
@Override
6055
public int skippedOperations() {
61-
return Arrays.stream(translogs).mapToInt(TranslogSnapshot::skippedOperations).sum() + overriddenOperations;
56+
return Arrays.stream(translogs).mapToInt(TranslogSnapshot::skippedOperations).sum();
6257
}
6358

6459
@Override
6560
public Translog.Operation next() throws IOException {
66-
// TODO: Read translog forward in 9.0+
67-
for (; index >= 0; index--) {
61+
for (; index < translogs.length; index ++) {
6862
final TranslogSnapshot current = translogs[index];
6963
Translog.Operation op;
7064
while ((op = current.next()) != null) {
71-
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
72-
return op;
73-
} else {
74-
overriddenOperations++;
75-
}
65+
assert assertSameOperation(op);
66+
return op;
7667
}
7768
}
7869
return null;
7970
}
8071

72+
private boolean assertSameOperation(Translog.Operation op) {
73+
final long existingTerm = seqNoToTerm.put(op.seqNo(), op.primaryTerm());
74+
assert existingTerm == 0 || existingTerm == op.primaryTerm() :
75+
"Operation [" + op + "] was associated with a different primary term [" + existingTerm + "]";
76+
return true;
77+
}
78+
8179
@Override
8280
public void close() throws IOException {
8381
onClose.close();
8482
}
85-
86-
static final class SeqNoSet {
87-
static final short BIT_SET_SIZE = 1024;
88-
private final LongObjectHashMap<CountedBitSet> bitSets = new LongObjectHashMap<>();
89-
90-
/**
91-
* Marks this sequence number and returns {@code true} if it is seen before.
92-
*/
93-
boolean getAndSet(long value) {
94-
assert value >= 0;
95-
final long key = value / BIT_SET_SIZE;
96-
CountedBitSet bitset = bitSets.get(key);
97-
if (bitset == null) {
98-
bitset = new CountedBitSet(BIT_SET_SIZE);
99-
bitSets.put(key, bitset);
100-
}
101-
final int index = Math.toIntExact(value % BIT_SET_SIZE);
102-
final boolean wasOn = bitset.get(index);
103-
bitset.set(index);
104-
return wasOn;
105-
}
106-
}
10783
}

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.lucene.index.Term;
2424
import org.apache.lucene.store.AlreadyClosedException;
25+
import org.elasticsearch.Version;
2526
import org.elasticsearch.common.UUIDs;
2627
import org.elasticsearch.common.bytes.BytesArray;
2728
import org.elasticsearch.common.bytes.BytesReference;
@@ -132,6 +133,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
132133
private final TranslogDeletionPolicy deletionPolicy;
133134
private final LongConsumer persistedSequenceNumberConsumer;
134135

136+
// We can read translog either forward or in backward. Most of the time, we will read translog forward.
137+
// This setting is merely to force exercising the backward implementation more often until we no longer need it.
138+
// TODO: Remove this in 9.0
139+
public static final String FORCE_READING_TRANSLOG_IN_BACKWARD = "index.translog.read_in_backward";
140+
135141
/**
136142
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
137143
* {@code null}. If the generation is {@code null} this method is destructive and will delete all files in the translog path given. If
@@ -676,7 +682,13 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti
676682
}
677683
boolean success = false;
678684
try {
679-
Snapshot result = new MultiSnapshot(snapshots, onClose);
685+
final Snapshot result;
686+
if (indexSettings.getSettings().getAsBoolean(FORCE_READING_TRANSLOG_IN_BACKWARD, false)
687+
|| indexSettings.getIndexVersionCreated().before(Version.V_8_0_0)) {
688+
result = new BackwardMultiSnapshot(snapshots, onClose);
689+
} else {
690+
result = new MultiSnapshot(snapshots, onClose);
691+
}
680692
success = true;
681693
return result;
682694
} finally {

server/src/test/java/org/elasticsearch/index/translog/MultiSnapshotTests.java renamed to server/src/test/java/org/elasticsearch/index/translog/BackwardMultiSnapshotTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131

3232
import static org.hamcrest.CoreMatchers.equalTo;
3333

34-
public class MultiSnapshotTests extends ESTestCase {
34+
public class BackwardMultiSnapshotTests extends ESTestCase {
3535

3636
public void testTrackSeqNoSimpleRange() throws Exception {
37-
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
37+
final BackwardMultiSnapshot.SeqNoSet bitSet = new BackwardMultiSnapshot.SeqNoSet();
3838
final List<Long> values = LongStream.range(0, 1024).boxed().collect(Collectors.toList());
3939
Randomness.shuffle(values);
4040
for (int i = 0; i < 1023; i++) {
@@ -46,7 +46,7 @@ public void testTrackSeqNoSimpleRange() throws Exception {
4646
}
4747

4848
public void testTrackSeqNoDenseRanges() throws Exception {
49-
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
49+
final BackwardMultiSnapshot.SeqNoSet bitSet = new BackwardMultiSnapshot.SeqNoSet();
5050
final LongSet normalSet = new LongHashSet();
5151
IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> {
5252
long seq = between(0, 5000);
@@ -56,7 +56,7 @@ public void testTrackSeqNoDenseRanges() throws Exception {
5656
}
5757

5858
public void testTrackSeqNoSparseRanges() throws Exception {
59-
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
59+
final BackwardMultiSnapshot.SeqNoSet bitSet = new BackwardMultiSnapshot.SeqNoSet();
6060
final LongSet normalSet = new LongHashSet();
6161
IntStream.range(0, scaledRandomIntBetween(5_000, 10_000)).forEach(i -> {
6262
long seq = between(i * 10_000, i * 30_000);
@@ -66,7 +66,7 @@ public void testTrackSeqNoSparseRanges() throws Exception {
6666
}
6767

6868
public void testTrackSeqNoMimicTranslogRanges() throws Exception {
69-
final MultiSnapshot.SeqNoSet bitSet = new MultiSnapshot.SeqNoSet();
69+
final BackwardMultiSnapshot.SeqNoSet bitSet = new BackwardMultiSnapshot.SeqNoSet();
7070
final LongSet normalSet = new LongHashSet();
7171
long currentSeq = between(10_000_000, 1_000_000_000);
7272
final int iterations = scaledRandomIntBetween(100, 2000);

0 commit comments

Comments
 (0)