Skip to content

Commit d055324

Browse files
author
Andrey Ershov
committed
All Translog inner closes should happen after tragedy exception is set (#32674)
We faced with the nasty race condition. See #32526 InternalEngine.failOnTragic method has thrown AssertionError. If you carefully look at if branches in this method, you will spot that its only possible, if either Lucene IndexWriterhas closed from inside or Translog, has closed from inside, but tragedy exception is not set. For now, let us concentrate on the Translog class. We found out that there are two methods in Translog - namely rollGeneration and trimOperations that are closing Translog in case of Exception without tragedy exception being set. This commit fixes these 2 methods. To fix it, we pull tragedyException from TranslogWriter up-to Translog class, because in these 2 methods IndexWriter could be innocent, but still Translog needs to be closed. Also, tragedyException is wrapped with TragicExceptionHolder to reuse CAS/addSuppresed functionality in Translog and TranslogWriter. Also to protect us in the future and make sure close method is never called from inside Translog special assertion examining stack trace is added. Since we're still targeting Java 8 for runtime - no StackWalker API is used in the implementation. In the stack-trace checking method, we're considering inner caller not only Translog methods but Translog child classes methods as well. It does mean that Translog is meant for extending it, but it's needed to be able to test this method. Closes #32526 (cherry picked from commit 0749b18)
1 parent 491033f commit d055324

File tree

5 files changed

+137
-36
lines changed

5 files changed

+137
-36
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.translog;
21+
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
public class TragicExceptionHolder {
25+
private final AtomicReference<Exception> tragedy = new AtomicReference<>();
26+
27+
/**
28+
* Sets the tragic exception or if the tragic exception is already set adds passed exception as suppressed exception
29+
* @param ex tragic exception to set
30+
*/
31+
public void setTragicException(Exception ex) {
32+
assert ex != null;
33+
if (tragedy.compareAndSet(null, ex) == false) {
34+
if (tragedy.get() != ex) { // to ensure there is no self-suppression
35+
tragedy.get().addSuppressed(ex);
36+
}
37+
}
38+
}
39+
40+
public Exception get() {
41+
return tragedy.get();
42+
}
43+
}

server/src/main/java/org/elasticsearch/index/translog/Translog.java

+33-9
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.util.function.LongSupplier;
6969
import java.util.regex.Matcher;
7070
import java.util.regex.Pattern;
71+
import java.util.stream.Collectors;
7172
import java.util.stream.Stream;
7273

7374
/**
@@ -119,6 +120,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
119120
private final Path location;
120121
private TranslogWriter current;
121122

123+
protected final TragicExceptionHolder tragedy = new TragicExceptionHolder();
122124
private final AtomicBoolean closed = new AtomicBoolean();
123125
private final TranslogConfig config;
124126
private final LongSupplier globalCheckpointSupplier;
@@ -312,8 +314,28 @@ public boolean isOpen() {
312314
return closed.get() == false;
313315
}
314316

317+
private static boolean calledFromOutsideOrViaTragedyClose() {
318+
List<StackTraceElement> frames = Stream.of(Thread.currentThread().getStackTrace()).
319+
skip(3). //skip getStackTrace, current method and close method frames
320+
limit(10). //limit depth of analysis to 10 frames, it should be enough to catch closing with, e.g. IOUtils
321+
filter(f ->
322+
{
323+
try {
324+
return Translog.class.isAssignableFrom(Class.forName(f.getClassName()));
325+
} catch (Exception ignored) {
326+
return false;
327+
}
328+
}
329+
). //find all inner callers including Translog subclasses
330+
collect(Collectors.toList());
331+
//the list of inner callers should be either empty or should contain closeOnTragicEvent method
332+
return frames.isEmpty() || frames.stream().anyMatch(f -> f.getMethodName().equals("closeOnTragicEvent"));
333+
}
334+
315335
@Override
316336
public void close() throws IOException {
337+
assert calledFromOutsideOrViaTragedyClose() :
338+
"Translog.close method is called from inside Translog, but not via closeOnTragicEvent method";
317339
if (closed.compareAndSet(false, true)) {
318340
try (ReleasableLock lock = writeLock.acquire()) {
319341
try {
@@ -464,7 +486,7 @@ TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, lon
464486
getChannelFactory(),
465487
config.getBufferSize(),
466488
initialMinTranslogGen, initialGlobalCheckpoint,
467-
globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong());
489+
globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong(), tragedy);
468490
} catch (final IOException e) {
469491
throw new TranslogException(shardId, "failed to create new translog file", e);
470492
}
@@ -728,7 +750,8 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException {
728750
}
729751
} catch (IOException e) {
730752
IOUtils.closeWhileHandlingException(newReaders);
731-
close();
753+
tragedy.setTragicException(e);
754+
closeOnTragicEvent(e);
732755
throw e;
733756
}
734757

@@ -781,10 +804,10 @@ public boolean ensureSynced(Stream<Location> locations) throws IOException {
781804
*
782805
* @param ex if an exception occurs closing the translog, it will be suppressed into the provided exception
783806
*/
784-
private void closeOnTragicEvent(final Exception ex) {
807+
protected void closeOnTragicEvent(final Exception ex) {
785808
// we can not hold a read lock here because closing will attempt to obtain a write lock and that would result in self-deadlock
786809
assert readLock.isHeldByCurrentThread() == false : Thread.currentThread().getName();
787-
if (current.getTragicException() != null) {
810+
if (tragedy.get() != null) {
788811
try {
789812
close();
790813
} catch (final AlreadyClosedException inner) {
@@ -1608,7 +1631,8 @@ public void rollGeneration() throws IOException {
16081631
current = createWriter(current.getGeneration() + 1);
16091632
logger.trace("current translog set to [{}]", current.getGeneration());
16101633
} catch (final Exception e) {
1611-
IOUtils.closeWhileHandlingException(this); // tragic event
1634+
tragedy.setTragicException(e);
1635+
closeOnTragicEvent(e);
16121636
throw e;
16131637
}
16141638
}
@@ -1721,7 +1745,7 @@ long getFirstOperationPosition() { // for testing
17211745

17221746
private void ensureOpen() {
17231747
if (closed.get()) {
1724-
throw new AlreadyClosedException("translog is already closed", current.getTragicException());
1748+
throw new AlreadyClosedException("translog is already closed", tragedy.get());
17251749
}
17261750
}
17271751

@@ -1735,7 +1759,7 @@ ChannelFactory getChannelFactory() {
17351759
* Otherwise (no tragic exception has occurred) it returns null.
17361760
*/
17371761
public Exception getTragicException() {
1738-
return current.getTragicException();
1762+
return tragedy.get();
17391763
}
17401764

17411765
/** Reads and returns the current checkpoint */
@@ -1818,8 +1842,8 @@ static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, S
18181842
final String translogUUID = UUIDs.randomBase64UUID();
18191843
TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory,
18201844
new ByteSizeValue(10), 1, initialGlobalCheckpoint,
1821-
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm
1822-
);
1845+
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm,
1846+
new TragicExceptionHolder());
18231847
writer.close();
18241848
return translogUUID;
18251849
}

server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

+12-23
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
5252
/* the number of translog operations written to this file */
5353
private volatile int operationCounter;
5454
/* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
55-
private volatile Exception tragedy;
55+
private final TragicExceptionHolder tragedy;
5656
/* A buffered outputstream what writes to the writers channel */
5757
private final OutputStream outputStream;
5858
/* the total offset of this file including the bytes written to the file as well as into the buffer */
@@ -77,7 +77,10 @@ private TranslogWriter(
7777
final FileChannel channel,
7878
final Path path,
7979
final ByteSizeValue bufferSize,
80-
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header) throws IOException {
80+
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header,
81+
TragicExceptionHolder tragedy)
82+
throws
83+
IOException {
8184
super(initialCheckpoint.generation, channel, path, header);
8285
assert initialCheckpoint.offset == channel.position() :
8386
"initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position ["
@@ -95,12 +98,13 @@ private TranslogWriter(
9598
assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo;
9699
this.globalCheckpointSupplier = globalCheckpointSupplier;
97100
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
101+
this.tragedy = tragedy;
98102
}
99103

100104
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory,
101105
ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint,
102106
final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier,
103-
final long primaryTerm)
107+
final long primaryTerm, TragicExceptionHolder tragedy)
104108
throws IOException {
105109
final FileChannel channel = channelFactory.open(file);
106110
try {
@@ -121,7 +125,7 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f
121125
writerGlobalCheckpointSupplier = globalCheckpointSupplier;
122126
}
123127
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize,
124-
writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header);
128+
writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy);
125129
} catch (Exception exception) {
126130
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
127131
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition
@@ -130,24 +134,8 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f
130134
}
131135
}
132136

133-
/**
134-
* If this {@code TranslogWriter} was closed as a side-effect of a tragic exception,
135-
* e.g. disk full while flushing a new segment, this returns the root cause exception.
136-
* Otherwise (no tragic exception has occurred) it returns null.
137-
*/
138-
public Exception getTragicException() {
139-
return tragedy;
140-
}
141-
142137
private synchronized void closeWithTragicEvent(final Exception ex) {
143-
assert ex != null;
144-
if (tragedy == null) {
145-
tragedy = ex;
146-
} else if (tragedy != ex) {
147-
// it should be safe to call closeWithTragicEvents on multiple layers without
148-
// worrying about self suppression.
149-
tragedy.addSuppressed(ex);
150-
}
138+
tragedy.setTragicException(ex);
151139
try {
152140
close();
153141
} catch (final IOException | RuntimeException e) {
@@ -312,7 +300,8 @@ public TranslogReader closeIntoReader() throws IOException {
312300
if (closed.compareAndSet(false, true)) {
313301
return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header);
314302
} else {
315-
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy);
303+
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]",
304+
tragedy.get());
316305
}
317306
}
318307
}
@@ -422,7 +411,7 @@ Checkpoint getLastSyncedCheckpoint() {
422411

423412
protected final void ensureOpen() {
424413
if (isClosed()) {
425-
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy);
414+
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy.get());
426415
}
427416
}
428417

server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private Tuple<List<TranslogReader>, TranslogWriter> createReadersAndWriter(final
171171
}
172172
writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen,
173173
tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L,
174-
() -> 1L, randomNonNegativeLong());
174+
() -> 1L, randomNonNegativeLong(), new TragicExceptionHolder());
175175
writer = Mockito.spy(writer);
176176
Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime();
177177

server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

+48-3
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@
3030
import org.apache.lucene.store.AlreadyClosedException;
3131
import org.apache.lucene.store.ByteArrayDataOutput;
3232
import org.apache.lucene.store.MockDirectoryWrapper;
33+
import org.elasticsearch.Version;
3334
import org.elasticsearch.core.internal.io.IOUtils;
3435
import org.apache.lucene.util.LineFileDocs;
3536
import org.apache.lucene.util.LuceneTestCase;
36-
import org.elasticsearch.Version;
37+
import org.elasticsearch.Assertions;
3738
import org.elasticsearch.cluster.metadata.IndexMetaData;
3839
import org.elasticsearch.common.Randomness;
3940
import org.elasticsearch.common.Strings;
@@ -108,6 +109,7 @@
108109
import java.util.concurrent.atomic.AtomicInteger;
109110
import java.util.concurrent.atomic.AtomicLong;
110111
import java.util.concurrent.atomic.AtomicReference;
112+
import java.util.function.LongSupplier;
111113
import java.util.stream.Collectors;
112114
import java.util.stream.IntStream;
113115
import java.util.stream.LongStream;
@@ -1657,7 +1659,7 @@ public void testRandomExceptionsOnTrimOperations( ) throws Exception {
16571659
}
16581660

16591661
assertThat(expectedException, is(not(nullValue())));
1660-
1662+
assertThat(failableTLog.getTragicException(), equalTo(expectedException));
16611663
assertThat(fileChannels, is(not(empty())));
16621664
assertThat("all file channels have to be closed",
16631665
fileChannels.stream().filter(f -> f.isOpen()).findFirst().isPresent(), is(false));
@@ -2508,11 +2510,13 @@ public void testWithRandomException() throws IOException {
25082510
syncedDocs.addAll(unsynced);
25092511
unsynced.clear();
25102512
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
2511-
// fair enough
2513+
assertEquals(failableTLog.getTragicException(), ex);
25122514
} catch (IOException ex) {
25132515
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
2516+
assertEquals(failableTLog.getTragicException(), ex);
25142517
} catch (RuntimeException ex) {
25152518
assertEquals(ex.getMessage(), "simulated");
2519+
assertEquals(failableTLog.getTragicException(), ex);
25162520
} finally {
25172521
Checkpoint checkpoint = Translog.readCheckpoint(config.getTranslogPath());
25182522
if (checkpoint.numOps == unsynced.size() + syncedDocs.size()) {
@@ -2951,6 +2955,47 @@ public void testCloseSnapshotTwice() throws Exception {
29512955
}
29522956
}
29532957

2958+
// close method should never be called directly from Translog (the only exception is closeOnTragicEvent)
2959+
public void testTranslogCloseInvariant() throws IOException {
2960+
assumeTrue("test only works with assertions enabled", Assertions.ENABLED);
2961+
class MisbehavingTranslog extends Translog {
2962+
MisbehavingTranslog(TranslogConfig config, String translogUUID, TranslogDeletionPolicy deletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) throws IOException {
2963+
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier);
2964+
}
2965+
2966+
void callCloseDirectly() throws IOException {
2967+
close();
2968+
}
2969+
2970+
void callCloseUsingIOUtilsWithExceptionHandling() {
2971+
IOUtils.closeWhileHandlingException(this);
2972+
}
2973+
2974+
void callCloseUsingIOUtils() throws IOException {
2975+
IOUtils.close(this);
2976+
}
2977+
2978+
void callCloseOnTragicEvent() {
2979+
Exception e = new Exception("test tragic exception");
2980+
tragedy.setTragicException(e);
2981+
closeOnTragicEvent(e);
2982+
}
2983+
}
2984+
2985+
2986+
globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
2987+
Path path = createTempDir();
2988+
final TranslogConfig translogConfig = getTranslogConfig(path);
2989+
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings());
2990+
final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
2991+
MisbehavingTranslog misbehavingTranslog = new MisbehavingTranslog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get(), primaryTerm::get);
2992+
2993+
expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseDirectly());
2994+
expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtils());
2995+
expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtilsWithExceptionHandling());
2996+
misbehavingTranslog.callCloseOnTragicEvent();
2997+
}
2998+
29542999
static class SortedSnapshot implements Translog.Snapshot {
29553000
private final Translog.Snapshot snapshot;
29563001
private List<Translog.Operation> operations = null;

0 commit comments

Comments
 (0)