Skip to content

Write next cluster state fully on all failures #73631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ static class LucenePersistedState implements PersistedState {

// As the close method can be concurrently called to the other PersistedState methods, this class has extra protection in place.
private final AtomicReference<PersistedClusterStateService.Writer> persistenceWriter = new AtomicReference<>();
boolean writeNextStateFully;
private boolean writeNextStateFully;

LucenePersistedState(PersistedClusterStateService persistedClusterStateService, long currentTerm, ClusterState lastAcceptedState)
throws IOException {
Expand Down Expand Up @@ -439,13 +439,15 @@ public void setCurrentTerm(long currentTerm) {
try {
if (writeNextStateFully) {
getWriterSafe().writeFullStateAndCommit(currentTerm, lastAcceptedState);
writeNextStateFully = false;
} else {
writeNextStateFully = true; // in case of failure; this flag is cleared on success
getWriterSafe().writeIncrementalTermUpdateAndCommit(currentTerm, lastAcceptedState.version());
}
} catch (Exception e) {
handleExceptionOnWrite(e);
} catch (IOException e) {
throw new ElasticsearchException(e);
}

writeNextStateFully = false;
this.currentTerm = currentTerm;
}

Expand All @@ -454,8 +456,8 @@ public void setLastAcceptedState(ClusterState clusterState) {
try {
if (writeNextStateFully) {
getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState);
writeNextStateFully = false;
} else {
writeNextStateFully = true; // in case of failure; this flag is cleared on success
if (clusterState.term() != lastAcceptedState.term()) {
assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term();
// In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state,
Expand All @@ -466,10 +468,11 @@ public void setLastAcceptedState(ClusterState clusterState) {
getWriterSafe().writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState);
}
}
} catch (Exception e) {
handleExceptionOnWrite(e);
} catch (IOException e) {
throw new ElasticsearchException(e);
}

writeNextStateFully = false;
lastAcceptedState = clusterState;
}

Expand All @@ -496,11 +499,6 @@ private PersistedClusterStateService.Writer getWriterSafe() {
}
}

private void handleExceptionOnWrite(Exception e) {
writeNextStateFully = true;
throw ExceptionsHelper.convertToRuntime(e);
}

@Override
public void close() throws IOException {
IOUtils.close(persistenceWriter.getAndSet(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.test.NodeRoles.nonMasterNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -296,6 +298,8 @@ public void testStatePersistedOnLoad() throws IOException {
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
final ClusterState state = createClusterState(randomNonNegativeLong(),
Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build());

//noinspection EmptyTryBlock
try (GatewayMetaState.LucenePersistedState ignored = new GatewayMetaState.LucenePersistedState(
persistedClusterStateService, 42L, state)) {

Expand Down Expand Up @@ -519,10 +523,76 @@ Directory createDirectory(Path path) {
}
}

public void testStatePersistenceWithFatalError() throws IOException {
final AtomicBoolean throwError = new AtomicBoolean();
final BigArrays realBigArrays = getBigArrays();
final BigArrays mockBigArrays = mock(BigArrays.class);
when(mockBigArrays.newByteArray(anyLong())).thenAnswer(invocationOnMock ->
{
if (throwError.get()) {
throw new TestError();
}
return realBigArrays.newByteArray((Long) invocationOnMock.getArguments()[0]);
});

final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), mockBigArrays,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
ClusterState state = createClusterState(randomNonNegativeLong(),
Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build());
long currentTerm = 42L;
try (GatewayMetaState.LucenePersistedState persistedState = new GatewayMetaState.LucenePersistedState(
persistedClusterStateService, currentTerm, state)) {

throwError.set(true);

final ClusterState newState = createClusterState(
randomNonNegativeLong(),
Metadata.builder()
.clusterUUID(randomAlphaOfLength(10))
.coordinationMetadata(CoordinationMetadata.builder().term(currentTerm).build())
.build());
expectThrows(TestError.class, () -> persistedState.setLastAcceptedState(newState));

throwError.set(false);

currentTerm += 1;
persistedState.setCurrentTerm(currentTerm);

assertEquals(state, persistedState.getLastAcceptedState());
assertEquals(currentTerm, persistedState.getCurrentTerm());
}

nodeEnvironment.close();

Path path = nodeEnvironment.nodeDataPath();
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
.put(Environment.PATH_DATA_SETTING.getKey(), path.toString()).build();
try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
final PersistedClusterStateService newPersistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadOnDiskState();
assertFalse(onDiskState.empty());
assertThat(onDiskState.currentTerm, equalTo(currentTerm));
assertClusterStateEqual(state,
ClusterState.builder(ClusterName.DEFAULT)
.version(onDiskState.lastAcceptedVersion)
.metadata(onDiskState.metadata).build());
}
}

private static BigArrays getBigArrays() {
return usually()
? BigArrays.NON_RECYCLING_INSTANCE
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}

private static final class TestError extends Error {
TestError() {
super("test error");
}
}

}