Skip to content
This repository was archived by the owner on Sep 24, 2019. It is now read-only.

Commit 2a9c58c

Browse files
author
Vladimir Dolzhenko
committed
Allow to trim all ops above a certain seq# with a term lower than X - fixes on PR elastic#3
Relates to elastic#10708
1 parent 92addd1 commit 2a9c58c

File tree

3 files changed

+78
-38
lines changed

3 files changed

+78
-38
lines changed

server/src/main/java/org/elasticsearch/index/translog/Translog.java

+9-20
Original file line numberDiff line numberDiff line change
@@ -706,27 +706,16 @@ public void trim(long belowTerm, long aboveSeqNo) throws IOException {
706706

707707
// update all existed ones (if it is necessary) as checkpoint and reader are immutable
708708
final List<TranslogReader> newReaders = new ArrayList<>(readers.size());
709-
for (TranslogReader reader : readers) {
710-
final Checkpoint checkpoint = reader.getCheckpoint();
711-
final Path checkpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
712-
713-
final TranslogReader newReader;
714-
if (reader.getPrimaryTerm() < belowTerm
715-
&& checkpoint.maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED
716-
&& (aboveSeqNo < checkpoint.trimmedAboveSeqNo || checkpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO)) {
717-
final Checkpoint newCheckpoint = new Checkpoint(checkpoint.offset, checkpoint.numOps,
718-
checkpoint.generation, checkpoint.minSeqNo, checkpoint.maxSeqNo,
719-
checkpoint.globalCheckpoint, checkpoint.minTranslogGeneration, aboveSeqNo);
720-
Checkpoint.write(FileChannel::open, checkpointFile, newCheckpoint, StandardOpenOption.WRITE);
721-
722-
IOUtils.fsync(checkpointFile, false);
723-
IOUtils.fsync(checkpointFile.getParent(), true);
724-
725-
newReader = reader.withNewCheckpoint(newCheckpoint);
726-
} else {
727-
newReader = reader;
709+
try {
710+
for (TranslogReader reader : readers) {
711+
final TranslogReader newReader = reader.closeIntoTrimmedReader(belowTerm, aboveSeqNo, getChannelFactory());
712+
newReaders.add(newReader);
728713
}
729-
newReaders.add(newReader);
714+
} catch (IOException e){
715+
IOUtils.closeWhileHandlingException(newReaders);
716+
IOUtils.closeWhileHandlingException(current);
717+
IOUtils.closeWhileHandlingException(readers);
718+
throw e;
730719
}
731720

732721
this.readers.clear();

server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java

+26-18
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,28 @@
2121

2222
import org.apache.lucene.store.AlreadyClosedException;
2323
import org.elasticsearch.common.io.Channels;
24+
import org.elasticsearch.core.internal.io.IOUtils;
25+
import org.elasticsearch.index.seqno.SequenceNumbers;
2426

2527
import java.io.Closeable;
2628
import java.io.EOFException;
2729
import java.io.IOException;
2830
import java.nio.ByteBuffer;
2931
import java.nio.channels.FileChannel;
3032
import java.nio.file.Path;
33+
import java.nio.file.StandardOpenOption;
3134
import java.util.concurrent.atomic.AtomicBoolean;
3235

36+
import static org.elasticsearch.index.translog.Translog.getCommitCheckpointFileName;
37+
3338
/**
3439
* an immutable translog filereader
3540
*/
3641
public class TranslogReader extends BaseTranslogReader implements Closeable {
3742
protected final long length;
3843
private final int totalOperations;
3944
private final Checkpoint checkpoint;
40-
protected final AtomicBoolean closed;
45+
protected final AtomicBoolean closed = new AtomicBoolean(false);
4146

4247
/**
4348
* Create a translog writer against the specified translog file channel.
@@ -48,24 +53,10 @@ public class TranslogReader extends BaseTranslogReader implements Closeable {
4853
* @param header the header of the translog file
4954
*/
5055
TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final TranslogHeader header) {
51-
this(checkpoint, channel, path, header, new AtomicBoolean(false));
52-
}
53-
54-
/**
55-
* Create a translog writer against the specified translog file channel.
56-
*
57-
* @param checkpoint the translog checkpoint
58-
* @param channel the translog file channel to open a translog reader against
59-
* @param path the path to the translog
60-
* @param header the header of the translog file
61-
*
62-
*/
63-
TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final TranslogHeader header, AtomicBoolean closed) {
6456
super(checkpoint.generation, channel, path, header);
6557
this.length = checkpoint.offset;
6658
this.totalOperations = checkpoint.numOps;
6759
this.checkpoint = checkpoint;
68-
this.closed = closed;
6960
}
7061

7162
/**
@@ -85,10 +76,27 @@ public static TranslogReader open(
8576
}
8677

8778
/**
88-
* Create a new reader with new checkoint that shares resources with current one
79+
* Closes current reader and creates new one with new checkoint and same file channel
8980
*/
90-
TranslogReader withNewCheckpoint(final Checkpoint newCheckpoint){
91-
return new TranslogReader(newCheckpoint, channel, path, header, closed);
81+
TranslogReader closeIntoTrimmedReader(long belowTerm, long aboveSeqNo, ChannelFactory channelFactory) throws IOException {
82+
if (!closed.get()
83+
&& getPrimaryTerm() < belowTerm
84+
&& checkpoint.maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED
85+
&& (aboveSeqNo < checkpoint.trimmedAboveSeqNo || checkpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO)) {
86+
final Path checkpointFile = path.getParent().resolve(getCommitCheckpointFileName(checkpoint.generation));
87+
final Checkpoint newCheckpoint = new Checkpoint(checkpoint.offset, checkpoint.numOps,
88+
checkpoint.generation, checkpoint.minSeqNo, checkpoint.maxSeqNo,
89+
checkpoint.globalCheckpoint, checkpoint.minTranslogGeneration, aboveSeqNo);
90+
Checkpoint.write(channelFactory, checkpointFile, newCheckpoint, StandardOpenOption.WRITE);
91+
92+
IOUtils.fsync(checkpointFile, false);
93+
IOUtils.fsync(checkpointFile.getParent(), true);
94+
95+
closed.set(true);
96+
97+
return new TranslogReader(newCheckpoint, channel, path, header);
98+
}
99+
return this;
92100
}
93101

94102
public long sizeInBytes() {

server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

+43
Original file line numberDiff line numberDiff line change
@@ -1545,6 +1545,49 @@ public void testTrimmedSnapshot() throws Exception {
15451545
assertThat(expectedSeqNo, hasSize(0));
15461546
}
15471547

1548+
public void testExceptionOnTrimAboveSeqNo( ) throws Exception {
1549+
Path tempDir = createTempDir();
1550+
final FailSwitch fail = new FailSwitch();
1551+
fail.failNever();
1552+
TranslogConfig config = getTranslogConfig(tempDir);
1553+
final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy());
1554+
1555+
List<Translog.Index> operations = new ArrayList<>();
1556+
int translogOperations = randomIntBetween(10, 100);
1557+
int maxTrimmedSeqNo = translogOperations - randomIntBetween(4, 8);
1558+
1559+
for (int op = 0; op < translogOperations; op++) {
1560+
String ascii = randomAlphaOfLengthBetween(1, 50);
1561+
Translog.Index operation = new Translog.Index("test", "" + op, op,
1562+
primaryTerm.get(), ascii.getBytes("UTF-8"));
1563+
operations.add(operation);
1564+
}
1565+
// shuffle a bit - move several first items to the end
1566+
for(int i = 0, len = randomIntBetween(5, 10); i < len; i++){
1567+
operations.add(operations.remove(0));
1568+
}
1569+
1570+
for (Translog.Index operation : operations) {
1571+
failableTLog.add(operation);
1572+
}
1573+
1574+
failableTLog.rollGeneration();
1575+
fail.failAlways();
1576+
try {
1577+
failableTLog.trim(primaryTerm.get() + 1, maxTrimmedSeqNo);
1578+
fail();
1579+
} catch (MockDirectoryWrapper.FakeIOException ex) {
1580+
// all is fine
1581+
}
1582+
1583+
try {
1584+
failableTLog.newSnapshot();
1585+
fail();
1586+
} catch (AlreadyClosedException e){
1587+
assertThat(e.getMessage(), is("translog [" + failableTLog.currentFileGeneration() + "] is already closed"));
1588+
}
1589+
}
1590+
15481591
public void testLocationHashCodeEquals() throws IOException {
15491592
List<Translog.Location> locations = new ArrayList<>();
15501593
List<Translog.Location> locations2 = new ArrayList<>();

0 commit comments

Comments
 (0)