Skip to content

Commit acc7637

Browse files
committed
Die with dignity while merging
If an out of memory error is thrown while merging, today we quietly rewrap it into a merge exception and the out of memory error is lost. Instead, we need to rethrow out of memory errors, and in fact any fatal error here, and let those go uncaught so that the node is torn down. This commit causes this to be the case.
1 parent 0635778 commit acc7637

File tree

9 files changed

+604
-374
lines changed

9 files changed

+604
-374
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1586,16 +1586,9 @@ private boolean failOnTragicEvent(AlreadyClosedException ex) {
15861586
// we need to fail the engine. it might have already been failed before
15871587
// but we are double-checking it's failed and closed
15881588
if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) {
1589-
if (indexWriter.getTragicException() instanceof Error) {
1590-
try {
1591-
logger.error("tragic event in index writer", ex);
1592-
} finally {
1593-
throw (Error) indexWriter.getTragicException();
1594-
}
1595-
} else {
1596-
failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException());
1597-
engineFailed = true;
1598-
}
1589+
maybeDie("tragic event in index writer", indexWriter.getTragicException());
1590+
failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException());
1591+
engineFailed = true;
15991592
} else if (translog.isOpen() == false && translog.getTragicException() != null) {
16001593
failEngine("already closed by tragic event on the translog", translog.getTragicException());
16011594
engineFailed = true;
@@ -1916,7 +1909,6 @@ protected void doRun() throws Exception {
19161909

19171910
@Override
19181911
protected void handleMergeException(final Directory dir, final Throwable exc) {
1919-
logger.error("failed to merge", exc);
19201912
engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
19211913
@Override
19221914
public void onFailure(Exception e) {
@@ -1925,13 +1917,24 @@ public void onFailure(Exception e) {
19251917

19261918
@Override
19271919
protected void doRun() throws Exception {
1928-
MergePolicy.MergeException e = new MergePolicy.MergeException(exc, dir);
1929-
failEngine("merge failed", e);
1920+
maybeDie("fatal merge error", exc);
1921+
logger.error("failed to merge", exc);
1922+
failEngine("merge failed", new MergePolicy.MergeException(exc, dir));
19301923
}
19311924
});
19321925
}
19331926
}
19341927

1928+
void maybeDie(final String maybeMessage, final Throwable maybeFatal) {
1929+
if (maybeFatal instanceof Error) {
1930+
try {
1931+
logger.error(maybeMessage, maybeFatal);
1932+
} finally {
1933+
throw (Error) maybeFatal;
1934+
}
1935+
}
1936+
}
1937+
19351938
/**
19361939
* Commits the specified index writer.
19371940
*

core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import java.util.Collections;
3131
import java.util.List;
3232

33-
import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
33+
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
3434
import static org.mockito.Mockito.mock;
3535
import static org.mockito.Mockito.times;
3636
import static org.mockito.Mockito.verify;

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 2 additions & 346 deletions
Large diffs are not rendered by default.

core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,6 @@
4343

4444
public class TranslogDeletionPolicyTests extends ESTestCase {
4545

46-
public static TranslogDeletionPolicy createTranslogDeletionPolicy() {
47-
return new TranslogDeletionPolicy(
48-
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
49-
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis()
50-
);
51-
}
52-
53-
public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) {
54-
return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(),
55-
indexSettings.getTranslogRetentionAge().getMillis());
56-
}
57-
5846
public void testNoRetention() throws IOException {
5947
long now = System.currentTimeMillis();
6048
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);

core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107

108108
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
109109
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
110-
import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
110+
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
111111
import static org.hamcrest.Matchers.containsString;
112112
import static org.hamcrest.Matchers.empty;
113113
import static org.hamcrest.Matchers.equalTo;

core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import java.util.concurrent.CountDownLatch;
4545
import java.util.concurrent.Future;
4646

47-
import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
47+
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
4848
import static org.hamcrest.Matchers.empty;
4949
import static org.hamcrest.Matchers.equalTo;
5050
import static org.hamcrest.Matchers.not;
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.engine;
21+
22+
import org.apache.lucene.index.IndexWriter;
23+
import org.apache.lucene.index.MergePolicy;
24+
import org.apache.lucene.index.SegmentCommitInfo;
25+
import org.elasticsearch.index.mapper.ParsedDocument;
26+
27+
import java.io.IOException;
28+
import java.util.List;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.stream.Collectors;
32+
import java.util.stream.StreamSupport;
33+
34+
import static org.hamcrest.Matchers.containsString;
35+
import static org.hamcrest.Matchers.hasToString;
36+
import static org.hamcrest.Matchers.instanceOf;
37+
38+
public class EvilInternalEngineTests extends EngineTestCase {
39+
40+
public void testOutOfMemoryErrorWhileMergingIsRethrownAndIsUncaught() throws IOException, InterruptedException {
41+
engine.close();
42+
final AtomicReference<Throwable> maybeDie = new AtomicReference<>();
43+
final CountDownLatch latch = new CountDownLatch(1);
44+
final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
45+
try {
46+
/*
47+
* We want to test that the out of memory error thrown from the merge goes uncaught; this gives us confidence that an out of
48+
* memory error thrown while merging will lead to the node being torn down.
49+
*/
50+
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
51+
maybeDie.set(e);
52+
latch.countDown();
53+
});
54+
final AtomicReference<List<SegmentCommitInfo>> segmentsReference = new AtomicReference<>();
55+
56+
try (Engine e = createEngine(
57+
defaultSettings,
58+
store,
59+
primaryTranslogDir,
60+
newMergePolicy(),
61+
(directory, iwc) -> new IndexWriter(directory, iwc) {
62+
@Override
63+
public void merge(MergePolicy.OneMerge merge) throws IOException {
64+
throw new OutOfMemoryError("640k ought to be enough for anybody");
65+
}
66+
67+
@Override
68+
public synchronized MergePolicy.OneMerge getNextMerge() {
69+
/*
70+
* This will be called when we flush when we will not be ready to return the segments. After the segments are on
71+
* disk, we can only return them from here once or the merge scheduler will be stuck in a loop repeatedly
72+
* peeling off the same segments to schedule for merging.
73+
*/
74+
if (segmentsReference.get() == null) {
75+
return super.getNextMerge();
76+
} else {
77+
final List<SegmentCommitInfo> segments = segmentsReference.getAndSet(null);
78+
return new MergePolicy.OneMerge(segments);
79+
}
80+
}
81+
},
82+
null)) {
83+
// force segments to exist on disk
84+
final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
85+
e.index(indexForDoc(doc1));
86+
e.flush();
87+
final List<SegmentCommitInfo> segments =
88+
StreamSupport.stream(e.getLastCommittedSegmentInfos().spliterator(), false).collect(Collectors.toList());
89+
segmentsReference.set(segments);
90+
// trigger a background merge that will be managed by the concurrent merge scheduler
91+
e.forceMerge(randomBoolean(), 0, false, false, false);
92+
latch.await();
93+
assertNotNull(maybeDie.get());
94+
assertThat(maybeDie.get(), instanceOf(OutOfMemoryError.class));
95+
assertThat(maybeDie.get(), hasToString(containsString("640k ought to be enough for anybody")));
96+
}
97+
} finally {
98+
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
99+
}
100+
}
101+
102+
103+
}

0 commit comments

Comments
 (0)