Skip to content

Commit 0749b18

Browse files
authored
All Translog inner closes should happen after tragedy exception is set (#32674)
All Translog inner closes should happen after tragedy exception is set (#32674) We faced with the nasty race condition. See #32526 InternalEngine.failOnTragic method has thrown AssertionError. If you carefully look at if branches in this method, you will spot that its only possible, if either Lucene IndexWriterhas closed from inside or Translog, has closed from inside, but tragedy exception is not set. For now, let us concentrate on the Translog class. We found out that there are two methods in Translog - namely rollGeneration and trimOperations that are closing Translog in case of Exception without tragedy exception being set. This commit fixes these 2 methods. To fix it, we pull tragedyException from TranslogWriter up-to Translog class, because in these 2 methods IndexWriter could be innocent, but still Translog needs to be closed. Also, tragedyException is wrapped with TragicExceptionHolder to reuse CAS/addSuppresed functionality in Translog and TranslogWriter. Also to protect us in the future and make sure close method is never called from inside Translog special assertion examining stack trace is added. Since we're still targeting Java 8 for runtime - no StackWalker API is used in the implementation. In the stack-trace checking method, we're considering inner caller not only Translog methods but Translog child classes methods as well. It does mean that Translog is meant for extending it, but it's needed to be able to test this method. Closes #32526
1 parent 34295fa commit 0749b18

File tree

5 files changed

+136
-35
lines changed

5 files changed

+136
-35
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.translog;
21+
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
public class TragicExceptionHolder {
25+
private final AtomicReference<Exception> tragedy = new AtomicReference<>();
26+
27+
/**
28+
* Sets the tragic exception or if the tragic exception is already set adds passed exception as suppressed exception
29+
* @param ex tragic exception to set
30+
*/
31+
public void setTragicException(Exception ex) {
32+
assert ex != null;
33+
if (tragedy.compareAndSet(null, ex) == false) {
34+
if (tragedy.get() != ex) { // to ensure there is no self-suppression
35+
tragedy.get().addSuppressed(ex);
36+
}
37+
}
38+
}
39+
40+
public Exception get() {
41+
return tragedy.get();
42+
}
43+
}

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import java.util.function.LongSupplier;
6767
import java.util.regex.Matcher;
6868
import java.util.regex.Pattern;
69+
import java.util.stream.Collectors;
6970
import java.util.stream.Stream;
7071

7172
/**
@@ -117,6 +118,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
117118
private final Path location;
118119
private TranslogWriter current;
119120

121+
protected final TragicExceptionHolder tragedy = new TragicExceptionHolder();
120122
private final AtomicBoolean closed = new AtomicBoolean();
121123
private final TranslogConfig config;
122124
private final LongSupplier globalCheckpointSupplier;
@@ -310,8 +312,28 @@ public boolean isOpen() {
310312
return closed.get() == false;
311313
}
312314

315+
private static boolean calledFromOutsideOrViaTragedyClose() {
316+
List<StackTraceElement> frames = Stream.of(Thread.currentThread().getStackTrace()).
317+
skip(3). //skip getStackTrace, current method and close method frames
318+
limit(10). //limit depth of analysis to 10 frames, it should be enough to catch closing with, e.g. IOUtils
319+
filter(f ->
320+
{
321+
try {
322+
return Translog.class.isAssignableFrom(Class.forName(f.getClassName()));
323+
} catch (Exception ignored) {
324+
return false;
325+
}
326+
}
327+
). //find all inner callers including Translog subclasses
328+
collect(Collectors.toList());
329+
//the list of inner callers should be either empty or should contain closeOnTragicEvent method
330+
return frames.isEmpty() || frames.stream().anyMatch(f -> f.getMethodName().equals("closeOnTragicEvent"));
331+
}
332+
313333
@Override
314334
public void close() throws IOException {
335+
assert calledFromOutsideOrViaTragedyClose() :
336+
"Translog.close method is called from inside Translog, but not via closeOnTragicEvent method";
315337
if (closed.compareAndSet(false, true)) {
316338
try (ReleasableLock lock = writeLock.acquire()) {
317339
try {
@@ -462,7 +484,7 @@ TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, lon
462484
getChannelFactory(),
463485
config.getBufferSize(),
464486
initialMinTranslogGen, initialGlobalCheckpoint,
465-
globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong());
487+
globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong(), tragedy);
466488
} catch (final IOException e) {
467489
throw new TranslogException(shardId, "failed to create new translog file", e);
468490
}
@@ -726,7 +748,8 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException {
726748
}
727749
} catch (IOException e) {
728750
IOUtils.closeWhileHandlingException(newReaders);
729-
close();
751+
tragedy.setTragicException(e);
752+
closeOnTragicEvent(e);
730753
throw e;
731754
}
732755

@@ -779,10 +802,10 @@ public boolean ensureSynced(Stream<Location> locations) throws IOException {
779802
*
780803
* @param ex if an exception occurs closing the translog, it will be suppressed into the provided exception
781804
*/
782-
private void closeOnTragicEvent(final Exception ex) {
805+
protected void closeOnTragicEvent(final Exception ex) {
783806
// we can not hold a read lock here because closing will attempt to obtain a write lock and that would result in self-deadlock
784807
assert readLock.isHeldByCurrentThread() == false : Thread.currentThread().getName();
785-
if (current.getTragicException() != null) {
808+
if (tragedy.get() != null) {
786809
try {
787810
close();
788811
} catch (final AlreadyClosedException inner) {
@@ -1556,7 +1579,8 @@ public void rollGeneration() throws IOException {
15561579
current = createWriter(current.getGeneration() + 1);
15571580
logger.trace("current translog set to [{}]", current.getGeneration());
15581581
} catch (final Exception e) {
1559-
IOUtils.closeWhileHandlingException(this); // tragic event
1582+
tragedy.setTragicException(e);
1583+
closeOnTragicEvent(e);
15601584
throw e;
15611585
}
15621586
}
@@ -1669,7 +1693,7 @@ long getFirstOperationPosition() { // for testing
16691693

16701694
private void ensureOpen() {
16711695
if (closed.get()) {
1672-
throw new AlreadyClosedException("translog is already closed", current.getTragicException());
1696+
throw new AlreadyClosedException("translog is already closed", tragedy.get());
16731697
}
16741698
}
16751699

@@ -1683,7 +1707,7 @@ ChannelFactory getChannelFactory() {
16831707
* Otherwise (no tragic exception has occurred) it returns null.
16841708
*/
16851709
public Exception getTragicException() {
1686-
return current.getTragicException();
1710+
return tragedy.get();
16871711
}
16881712

16891713
/** Reads and returns the current checkpoint */
@@ -1766,8 +1790,8 @@ static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, S
17661790
final String translogUUID = UUIDs.randomBase64UUID();
17671791
TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory,
17681792
new ByteSizeValue(10), 1, initialGlobalCheckpoint,
1769-
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm
1770-
);
1793+
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm,
1794+
new TragicExceptionHolder());
17711795
writer.close();
17721796
return translogUUID;
17731797
}

server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
5151
/* the number of translog operations written to this file */
5252
private volatile int operationCounter;
5353
/* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
54-
private volatile Exception tragedy;
54+
private final TragicExceptionHolder tragedy;
5555
/* A buffered outputstream what writes to the writers channel */
5656
private final OutputStream outputStream;
5757
/* the total offset of this file including the bytes written to the file as well as into the buffer */
@@ -76,7 +76,10 @@ private TranslogWriter(
7676
final FileChannel channel,
7777
final Path path,
7878
final ByteSizeValue bufferSize,
79-
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header) throws IOException {
79+
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header,
80+
TragicExceptionHolder tragedy)
81+
throws
82+
IOException {
8083
super(initialCheckpoint.generation, channel, path, header);
8184
assert initialCheckpoint.offset == channel.position() :
8285
"initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position ["
@@ -94,12 +97,13 @@ private TranslogWriter(
9497
assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo;
9598
this.globalCheckpointSupplier = globalCheckpointSupplier;
9699
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
100+
this.tragedy = tragedy;
97101
}
98102

99103
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory,
100104
ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint,
101105
final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier,
102-
final long primaryTerm)
106+
final long primaryTerm, TragicExceptionHolder tragedy)
103107
throws IOException {
104108
final FileChannel channel = channelFactory.open(file);
105109
try {
@@ -120,7 +124,7 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f
120124
writerGlobalCheckpointSupplier = globalCheckpointSupplier;
121125
}
122126
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize,
123-
writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header);
127+
writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy);
124128
} catch (Exception exception) {
125129
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
126130
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition
@@ -129,24 +133,8 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f
129133
}
130134
}
131135

132-
/**
133-
* If this {@code TranslogWriter} was closed as a side-effect of a tragic exception,
134-
* e.g. disk full while flushing a new segment, this returns the root cause exception.
135-
* Otherwise (no tragic exception has occurred) it returns null.
136-
*/
137-
public Exception getTragicException() {
138-
return tragedy;
139-
}
140-
141136
private synchronized void closeWithTragicEvent(final Exception ex) {
142-
assert ex != null;
143-
if (tragedy == null) {
144-
tragedy = ex;
145-
} else if (tragedy != ex) {
146-
// it should be safe to call closeWithTragicEvents on multiple layers without
147-
// worrying about self suppression.
148-
tragedy.addSuppressed(ex);
149-
}
137+
tragedy.setTragicException(ex);
150138
try {
151139
close();
152140
} catch (final IOException | RuntimeException e) {
@@ -296,7 +284,8 @@ public TranslogReader closeIntoReader() throws IOException {
296284
if (closed.compareAndSet(false, true)) {
297285
return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header);
298286
} else {
299-
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy);
287+
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]",
288+
tragedy.get());
300289
}
301290
}
302291
}
@@ -406,7 +395,7 @@ Checkpoint getLastSyncedCheckpoint() {
406395

407396
protected final void ensureOpen() {
408397
if (isClosed()) {
409-
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy);
398+
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy.get());
410399
}
411400
}
412401

server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private Tuple<List<TranslogReader>, TranslogWriter> createReadersAndWriter(final
171171
}
172172
writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen,
173173
tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L,
174-
() -> 1L, randomNonNegativeLong());
174+
() -> 1L, randomNonNegativeLong(), new TragicExceptionHolder());
175175
writer = Mockito.spy(writer);
176176
Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime();
177177

server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.lucene.store.MockDirectoryWrapper;
3434
import org.apache.lucene.util.LineFileDocs;
3535
import org.apache.lucene.util.LuceneTestCase;
36+
import org.elasticsearch.Assertions;
3637
import org.elasticsearch.cluster.metadata.IndexMetaData;
3738
import org.elasticsearch.common.Randomness;
3839
import org.elasticsearch.common.Strings;
@@ -108,6 +109,7 @@
108109
import java.util.concurrent.atomic.AtomicInteger;
109110
import java.util.concurrent.atomic.AtomicLong;
110111
import java.util.concurrent.atomic.AtomicReference;
112+
import java.util.function.LongSupplier;
111113
import java.util.stream.Collectors;
112114
import java.util.stream.IntStream;
113115
import java.util.stream.LongStream;
@@ -1655,7 +1657,7 @@ public void testRandomExceptionsOnTrimOperations( ) throws Exception {
16551657
}
16561658

16571659
assertThat(expectedException, is(not(nullValue())));
1658-
1660+
assertThat(failableTLog.getTragicException(), equalTo(expectedException));
16591661
assertThat(fileChannels, is(not(empty())));
16601662
assertThat("all file channels have to be closed",
16611663
fileChannels.stream().filter(f -> f.isOpen()).findFirst().isPresent(), is(false));
@@ -2505,11 +2507,13 @@ public void testWithRandomException() throws IOException {
25052507
syncedDocs.addAll(unsynced);
25062508
unsynced.clear();
25072509
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
2508-
// fair enough
2510+
assertEquals(failableTLog.getTragicException(), ex);
25092511
} catch (IOException ex) {
25102512
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
2513+
assertEquals(failableTLog.getTragicException(), ex);
25112514
} catch (RuntimeException ex) {
25122515
assertEquals(ex.getMessage(), "simulated");
2516+
assertEquals(failableTLog.getTragicException(), ex);
25132517
} finally {
25142518
Checkpoint checkpoint = Translog.readCheckpoint(config.getTranslogPath());
25152519
if (checkpoint.numOps == unsynced.size() + syncedDocs.size()) {
@@ -2931,6 +2935,47 @@ public void testCloseSnapshotTwice() throws Exception {
29312935
}
29322936
}
29332937

2938+
// close method should never be called directly from Translog (the only exception is closeOnTragicEvent)
2939+
public void testTranslogCloseInvariant() throws IOException {
2940+
assumeTrue("test only works with assertions enabled", Assertions.ENABLED);
2941+
class MisbehavingTranslog extends Translog {
2942+
MisbehavingTranslog(TranslogConfig config, String translogUUID, TranslogDeletionPolicy deletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) throws IOException {
2943+
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier);
2944+
}
2945+
2946+
void callCloseDirectly() throws IOException {
2947+
close();
2948+
}
2949+
2950+
void callCloseUsingIOUtilsWithExceptionHandling() {
2951+
IOUtils.closeWhileHandlingException(this);
2952+
}
2953+
2954+
void callCloseUsingIOUtils() throws IOException {
2955+
IOUtils.close(this);
2956+
}
2957+
2958+
void callCloseOnTragicEvent() {
2959+
Exception e = new Exception("test tragic exception");
2960+
tragedy.setTragicException(e);
2961+
closeOnTragicEvent(e);
2962+
}
2963+
}
2964+
2965+
2966+
globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
2967+
Path path = createTempDir();
2968+
final TranslogConfig translogConfig = getTranslogConfig(path);
2969+
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings());
2970+
final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
2971+
MisbehavingTranslog misbehavingTranslog = new MisbehavingTranslog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get(), primaryTerm::get);
2972+
2973+
expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseDirectly());
2974+
expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtils());
2975+
expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtilsWithExceptionHandling());
2976+
misbehavingTranslog.callCloseOnTragicEvent();
2977+
}
2978+
29342979
static class SortedSnapshot implements Translog.Snapshot {
29352980
private final Translog.Snapshot snapshot;
29362981
private List<Translog.Operation> operations = null;

0 commit comments

Comments
 (0)