Skip to content

Commit f1abcf1

Browse files
authored
Write next cluster state fully on all failures (#73631)
Today we do not set the `LucenePersistedState#writeNextStateFully` flag on all failures, notably on an `OutOfMemoryError`. Since we don't exit immediately on an OOME we may have failed part-way through writing a full state but still proceed with another apparently-incremental write. With this commit we ensure `LucenePersistedState#writeNextStateFully` is only set if the previous write was successful.
1 parent 27dfc58 commit f1abcf1

File tree

2 files changed

+93
-13
lines changed

2 files changed

+93
-13
lines changed

server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ static class LucenePersistedState implements PersistedState {
396396

397397
// As the close method can be concurrently called to the other PersistedState methods, this class has extra protection in place.
398398
private final AtomicReference<PersistedClusterStateService.Writer> persistenceWriter = new AtomicReference<>();
399-
boolean writeNextStateFully;
399+
private boolean writeNextStateFully;
400400

401401
LucenePersistedState(PersistedClusterStateService persistedClusterStateService, long currentTerm, ClusterState lastAcceptedState)
402402
throws IOException {
@@ -439,13 +439,15 @@ public void setCurrentTerm(long currentTerm) {
439439
try {
440440
if (writeNextStateFully) {
441441
getWriterSafe().writeFullStateAndCommit(currentTerm, lastAcceptedState);
442-
writeNextStateFully = false;
443442
} else {
443+
writeNextStateFully = true; // in case of failure; this flag is cleared on success
444444
getWriterSafe().writeIncrementalTermUpdateAndCommit(currentTerm, lastAcceptedState.version());
445445
}
446-
} catch (Exception e) {
447-
handleExceptionOnWrite(e);
446+
} catch (IOException e) {
447+
throw new ElasticsearchException(e);
448448
}
449+
450+
writeNextStateFully = false;
449451
this.currentTerm = currentTerm;
450452
}
451453

@@ -454,8 +456,8 @@ public void setLastAcceptedState(ClusterState clusterState) {
454456
try {
455457
if (writeNextStateFully) {
456458
getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState);
457-
writeNextStateFully = false;
458459
} else {
460+
writeNextStateFully = true; // in case of failure; this flag is cleared on success
459461
if (clusterState.term() != lastAcceptedState.term()) {
460462
assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term();
461463
// In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state,
@@ -466,10 +468,11 @@ public void setLastAcceptedState(ClusterState clusterState) {
466468
getWriterSafe().writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState);
467469
}
468470
}
469-
} catch (Exception e) {
470-
handleExceptionOnWrite(e);
471+
} catch (IOException e) {
472+
throw new ElasticsearchException(e);
471473
}
472474

475+
writeNextStateFully = false;
473476
lastAcceptedState = clusterState;
474477
}
475478

@@ -496,11 +499,6 @@ private PersistedClusterStateService.Writer getWriterSafe() {
496499
}
497500
}
498501

499-
private void handleExceptionOnWrite(Exception e) {
500-
writeNextStateFully = true;
501-
throw ExceptionsHelper.convertToRuntime(e);
502-
}
503-
504502
@Override
505503
public void close() throws IOException {
506504
IOUtils.close(persistenceWriter.getAndSet(null));

server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@
4949
import java.util.Collections;
5050
import java.util.List;
5151
import java.util.concurrent.TimeUnit;
52+
import java.util.concurrent.atomic.AtomicBoolean;
5253
import java.util.concurrent.atomic.AtomicReference;
5354

5455
import static org.elasticsearch.test.NodeRoles.nonMasterNode;
5556
import static org.hamcrest.Matchers.equalTo;
5657
import static org.hamcrest.Matchers.instanceOf;
5758
import static org.hamcrest.Matchers.not;
59+
import static org.mockito.Matchers.anyLong;
5860
import static org.mockito.Mockito.mock;
5961
import static org.mockito.Mockito.when;
6062

@@ -296,6 +298,8 @@ public void testStatePersistedOnLoad() throws IOException {
296298
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
297299
final ClusterState state = createClusterState(randomNonNegativeLong(),
298300
Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build());
301+
302+
//noinspection EmptyTryBlock
299303
try (GatewayMetaState.LucenePersistedState ignored = new GatewayMetaState.LucenePersistedState(
300304
persistedClusterStateService, 42L, state)) {
301305

@@ -469,7 +473,7 @@ Directory createDirectory(Path path) {
469473
wrapper.setRandomIOExceptionRateOnOpen(ioExceptionRate.get());
470474
}
471475

472-
for (int i = 0; i < randomIntBetween(1, 5); i++) {
476+
for (int i = between(1, 5); 0 <= i; i--) {
473477
if (randomBoolean()) {
474478
final long version = randomNonNegativeLong();
475479
final String indexName = randomAlphaOfLength(10);
@@ -519,10 +523,88 @@ Directory createDirectory(Path path) {
519523
}
520524
}
521525

526+
public void testStatePersistenceWithFatalError() throws IOException {
527+
final AtomicBoolean throwError = new AtomicBoolean();
528+
final BigArrays realBigArrays = getBigArrays();
529+
final BigArrays mockBigArrays = mock(BigArrays.class);
530+
when(mockBigArrays.newByteArray(anyLong())).thenAnswer(invocationOnMock ->
531+
{
532+
if (throwError.get() && randomBoolean()) {
533+
throw new TestError();
534+
}
535+
return realBigArrays.newByteArray((Long) invocationOnMock.getArguments()[0]);
536+
});
537+
538+
final PersistedClusterStateService persistedClusterStateService =
539+
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), mockBigArrays,
540+
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
541+
ClusterState state = createClusterState(randomNonNegativeLong(),
542+
Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build());
543+
long currentTerm = 42L;
544+
try (GatewayMetaState.LucenePersistedState persistedState = new GatewayMetaState.LucenePersistedState(
545+
persistedClusterStateService, currentTerm, state)) {
546+
547+
throwError.set(true);
548+
549+
for (int i = between(1, 5); 0 <= i; i--) {
550+
if (randomBoolean()) {
551+
final ClusterState newState = createClusterState(
552+
randomNonNegativeLong(),
553+
Metadata.builder()
554+
.clusterUUID(randomAlphaOfLength(10))
555+
.coordinationMetadata(CoordinationMetadata.builder().term(currentTerm).build())
556+
.build());
557+
try {
558+
persistedState.setLastAcceptedState(newState);
559+
state = newState;
560+
} catch (TestError e) {
561+
// ok
562+
}
563+
} else {
564+
final long newTerm = currentTerm + 1;
565+
try {
566+
persistedState.setCurrentTerm(newTerm);
567+
currentTerm = newTerm;
568+
} catch (TestError e) {
569+
// ok
570+
}
571+
}
572+
}
573+
574+
assertEquals(state, persistedState.getLastAcceptedState());
575+
assertEquals(currentTerm, persistedState.getCurrentTerm());
576+
}
577+
578+
nodeEnvironment.close();
579+
580+
Path path = nodeEnvironment.nodeDataPath();
581+
Settings settings = Settings.builder()
582+
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
583+
.put(Environment.PATH_DATA_SETTING.getKey(), path.toString()).build();
584+
try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
585+
final PersistedClusterStateService newPersistedClusterStateService =
586+
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
587+
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
588+
final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadOnDiskState();
589+
assertFalse(onDiskState.empty());
590+
assertThat(onDiskState.currentTerm, equalTo(currentTerm));
591+
assertClusterStateEqual(state,
592+
ClusterState.builder(ClusterName.DEFAULT)
593+
.version(onDiskState.lastAcceptedVersion)
594+
.metadata(onDiskState.metadata).build());
595+
}
596+
}
597+
522598
private static BigArrays getBigArrays() {
523599
return usually()
524600
? BigArrays.NON_RECYCLING_INSTANCE
525601
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
526602
}
527603

604+
private static final class TestError extends Error {
605+
TestError() {
606+
super("test error");
607+
}
608+
}
609+
528610
}

0 commit comments

Comments
 (0)