Skip to content

Commit 4130852

Browse files
committed
Write next cluster state fully on all failures (#73631)
Today we do not set the `LucenePersistedState#writeNextStateFully` flag on all failures, notably on an `OutOfMemoryError`. Since we don't exit immediately on an OOME we may have failed part-way through writing a full state but still proceed with another apparently-incremental write. With this commit we ensure `LucenePersistedState#writeNextStateFully` is only set if the previous write was successful.
1 parent e682ea3 commit 4130852

File tree

2 files changed

+94
-13
lines changed

2 files changed

+94
-13
lines changed

server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ static class LucenePersistedState implements PersistedState {
462462

463463
// As the close method can be concurrently called to the other PersistedState methods, this class has extra protection in place.
464464
private final AtomicReference<PersistedClusterStateService.Writer> persistenceWriter = new AtomicReference<>();
465-
boolean writeNextStateFully;
465+
private boolean writeNextStateFully;
466466

467467
LucenePersistedState(PersistedClusterStateService persistedClusterStateService, long currentTerm, ClusterState lastAcceptedState)
468468
throws IOException {
@@ -505,13 +505,15 @@ public void setCurrentTerm(long currentTerm) {
505505
try {
506506
if (writeNextStateFully) {
507507
getWriterSafe().writeFullStateAndCommit(currentTerm, lastAcceptedState);
508-
writeNextStateFully = false;
509508
} else {
509+
writeNextStateFully = true; // in case of failure; this flag is cleared on success
510510
getWriterSafe().writeIncrementalTermUpdateAndCommit(currentTerm, lastAcceptedState.version());
511511
}
512-
} catch (Exception e) {
513-
handleExceptionOnWrite(e);
512+
} catch (IOException e) {
513+
throw new ElasticsearchException(e);
514514
}
515+
516+
writeNextStateFully = false;
515517
this.currentTerm = currentTerm;
516518
}
517519

@@ -520,8 +522,8 @@ public void setLastAcceptedState(ClusterState clusterState) {
520522
try {
521523
if (writeNextStateFully) {
522524
getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState);
523-
writeNextStateFully = false;
524525
} else {
526+
writeNextStateFully = true; // in case of failure; this flag is cleared on success
525527
if (clusterState.term() != lastAcceptedState.term()) {
526528
assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term();
527529
// In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state,
@@ -532,10 +534,11 @@ public void setLastAcceptedState(ClusterState clusterState) {
532534
getWriterSafe().writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState);
533535
}
534536
}
535-
} catch (Exception e) {
536-
handleExceptionOnWrite(e);
537+
} catch (IOException e) {
538+
throw new ElasticsearchException(e);
537539
}
538540

541+
writeNextStateFully = false;
539542
lastAcceptedState = clusterState;
540543
}
541544

@@ -562,11 +565,6 @@ private PersistedClusterStateService.Writer getWriterSafe() {
562565
}
563566
}
564567

565-
private void handleExceptionOnWrite(Exception e) {
566-
writeNextStateFully = true;
567-
throw ExceptionsHelper.convertToRuntime(e);
568-
}
569-
570568
@Override
571569
public void close() throws IOException {
572570
IOUtils.close(persistenceWriter.getAndSet(null));

server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@
4949
import java.util.Collections;
5050
import java.util.List;
5151
import java.util.concurrent.TimeUnit;
52+
import java.util.concurrent.atomic.AtomicBoolean;
5253
import java.util.concurrent.atomic.AtomicReference;
5354

5455
import static org.elasticsearch.test.NodeRoles.nonMasterNode;
5556
import static org.hamcrest.Matchers.equalTo;
5657
import static org.hamcrest.Matchers.instanceOf;
5758
import static org.hamcrest.Matchers.not;
59+
import static org.mockito.Matchers.anyLong;
5860
import static org.mockito.Mockito.mock;
5961
import static org.mockito.Mockito.when;
6062

@@ -296,6 +298,8 @@ public void testStatePersistedOnLoad() throws IOException {
296298
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
297299
final ClusterState state = createClusterState(randomNonNegativeLong(),
298300
Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build());
301+
302+
//noinspection EmptyTryBlock
299303
try (GatewayMetaState.LucenePersistedState ignored = new GatewayMetaState.LucenePersistedState(
300304
persistedClusterStateService, 42L, state)) {
301305

@@ -470,7 +474,7 @@ Directory createDirectory(Path path) {
470474
wrapper.setRandomIOExceptionRateOnOpen(ioExceptionRate.get());
471475
}
472476

473-
for (int i = 0; i < randomIntBetween(1, 5); i++) {
477+
for (int i = between(1, 5); 0 <= i; i--) {
474478
if (randomBoolean()) {
475479
final long version = randomNonNegativeLong();
476480
final String indexName = randomAlphaOfLength(10);
@@ -521,10 +525,89 @@ Directory createDirectory(Path path) {
521525
}
522526
}
523527

528+
public void testStatePersistenceWithFatalError() throws IOException {
529+
final AtomicBoolean throwError = new AtomicBoolean();
530+
final BigArrays realBigArrays = getBigArrays();
531+
final BigArrays mockBigArrays = mock(BigArrays.class);
532+
when(mockBigArrays.newByteArray(anyLong())).thenAnswer(invocationOnMock ->
533+
{
534+
if (throwError.get() && randomBoolean()) {
535+
throw new TestError();
536+
}
537+
return realBigArrays.newByteArray((Long) invocationOnMock.getArguments()[0]);
538+
});
539+
540+
final PersistedClusterStateService persistedClusterStateService =
541+
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), mockBigArrays,
542+
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
543+
ClusterState state = createClusterState(randomNonNegativeLong(),
544+
Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build());
545+
long currentTerm = 42L;
546+
try (GatewayMetaState.LucenePersistedState persistedState = new GatewayMetaState.LucenePersistedState(
547+
persistedClusterStateService, currentTerm, state)) {
548+
549+
throwError.set(false);
550+
551+
for (int i = between(1, 5); 0 <= i; i--) {
552+
if (randomBoolean()) {
553+
final ClusterState newState = createClusterState(
554+
randomNonNegativeLong(),
555+
Metadata.builder()
556+
.clusterUUID(randomAlphaOfLength(10))
557+
.coordinationMetadata(CoordinationMetadata.builder().term(currentTerm).build())
558+
.build());
559+
try {
560+
persistedState.setLastAcceptedState(newState);
561+
state = newState;
562+
} catch (TestError e) {
563+
// ok
564+
}
565+
} else {
566+
final long newTerm = currentTerm + 1;
567+
try {
568+
persistedState.setCurrentTerm(newTerm);
569+
currentTerm = newTerm;
570+
} catch (TestError e) {
571+
// ok
572+
}
573+
}
574+
}
575+
576+
assertEquals(state, persistedState.getLastAcceptedState());
577+
assertEquals(currentTerm, persistedState.getCurrentTerm());
578+
}
579+
580+
nodeEnvironment.close();
581+
582+
for (Path path : nodeEnvironment.nodeDataPaths()) {
583+
Settings settings = Settings.builder()
584+
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
585+
.put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build();
586+
try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
587+
final PersistedClusterStateService newPersistedClusterStateService =
588+
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
589+
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
590+
final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState();
591+
assertFalse(onDiskState.empty());
592+
assertThat(onDiskState.currentTerm, equalTo(currentTerm));
593+
assertClusterStateEqual(state,
594+
ClusterState.builder(ClusterName.DEFAULT)
595+
.version(onDiskState.lastAcceptedVersion)
596+
.metadata(onDiskState.metadata).build());
597+
}
598+
}
599+
}
600+
524601
private static BigArrays getBigArrays() {
525602
return usually()
526603
? BigArrays.NON_RECYCLING_INSTANCE
527604
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
528605
}
529606

607+
private static final class TestError extends Error {
608+
TestError() {
609+
super("test error");
610+
}
611+
}
612+
530613
}

0 commit comments

Comments
 (0)