Skip to content

Commit 14b5daa

Browse files
Fix Snapshot Completion Listener Lost on Master Failover (elastic#54286) (elastic#54330)
* Fix Snapshot Completion Listener Lost on Master Failover If master fails over before (or we run into any other exception) when removing the snapshot from the CS we must still resolve all the completion listeners for the snapshot.
1 parent 8126ad0 commit 14b5daa

File tree

2 files changed

+123
-12
lines changed

2 files changed

+123
-12
lines changed

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,7 +1104,8 @@ public void onFailure(final Exception e) {
11041104
// will try ending this snapshot again
11051105
logger.debug(() -> new ParameterizedMessage(
11061106
"[{}] failed to update cluster state during snapshot finalization", snapshot), e);
1107-
endingSnapshots.remove(snapshot);
1107+
failSnapshotCompletionListeners(snapshot,
1108+
new SnapshotException(snapshot, "Failed to update cluster state during snapshot finalization", e));
11081109
} else {
11091110
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
11101111
removeSnapshotFromClusterState(snapshot, null, e);
@@ -1158,42 +1159,56 @@ public ClusterState execute(ClusterState currentState) {
11581159
@Override
11591160
public void onFailure(String source, Exception e) {
11601161
logger.warn(() -> new ParameterizedMessage("[{}] failed to remove snapshot metadata", snapshot), e);
1161-
endingSnapshots.remove(snapshot);
1162+
failSnapshotCompletionListeners(
1163+
snapshot, new SnapshotException(snapshot, "Failed to remove snapshot from cluster state", e));
11621164
if (listener != null) {
11631165
listener.onFailure(e);
11641166
}
11651167
}
11661168

11671169
@Override
11681170
public void onNoLongerMaster(String source) {
1169-
endingSnapshots.remove(snapshot);
1171+
failSnapshotCompletionListeners(
1172+
snapshot, ExceptionsHelper.useOrSuppress(failure, new SnapshotException(snapshot, "no longer master")));
11701173
if (listener != null) {
11711174
listener.onNoLongerMaster();
11721175
}
11731176
}
11741177

11751178
@Override
11761179
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
1177-
final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
1178-
if (completionListeners != null) {
1179-
try {
1180-
if (snapshotInfo == null) {
1181-
ActionListener.onFailure(completionListeners, failure);
1182-
} else {
1180+
if (snapshotInfo == null) {
1181+
failSnapshotCompletionListeners(snapshot, failure);
1182+
} else {
1183+
final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
1184+
if (completionListeners != null) {
1185+
try {
11831186
ActionListener.onResponse(completionListeners, snapshotInfo);
1187+
} catch (Exception e) {
1188+
logger.warn("Failed to notify listeners", e);
11841189
}
1185-
} catch (Exception e) {
1186-
logger.warn("Failed to notify listeners", e);
11871190
}
1191+
endingSnapshots.remove(snapshot);
11881192
}
1189-
endingSnapshots.remove(snapshot);
11901193
if (listener != null) {
11911194
listener.onResponse(snapshotInfo);
11921195
}
11931196
}
11941197
});
11951198
}
11961199

1200+
private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
1201+
final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
1202+
if (completionListeners != null) {
1203+
try {
1204+
ActionListener.onFailure(completionListeners, e);
1205+
} catch (Exception ex) {
1206+
logger.warn("Failed to notify listeners", ex);
1207+
}
1208+
}
1209+
endingSnapshots.remove(snapshot);
1210+
}
1211+
11971212
/**
11981213
* Deletes a snapshot from the repository, looking up the {@link Snapshot} reference before deleting.
11991214
* If the snapshot is still running cancels the snapshot first and then deletes it from the repository.

server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.discovery;
2020

21+
import org.elasticsearch.ExceptionsHelper;
2122
import org.elasticsearch.action.ActionFuture;
2223
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
2324
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
@@ -27,11 +28,14 @@
2728
import org.elasticsearch.cluster.ClusterStateListener;
2829
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
2930
import org.elasticsearch.cluster.SnapshotsInProgress;
31+
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
32+
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
3033
import org.elasticsearch.cluster.service.ClusterService;
3134
import org.elasticsearch.common.settings.Settings;
3235
import org.elasticsearch.common.unit.ByteSizeUnit;
3336
import org.elasticsearch.plugins.Plugin;
3437
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
38+
import org.elasticsearch.snapshots.SnapshotException;
3539
import org.elasticsearch.snapshots.SnapshotInfo;
3640
import org.elasticsearch.snapshots.SnapshotMissingException;
3741
import org.elasticsearch.snapshots.SnapshotState;
@@ -49,6 +53,9 @@
4953
import java.util.concurrent.TimeUnit;
5054

5155
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
56+
import static org.hamcrest.Matchers.either;
57+
import static org.hamcrest.Matchers.endsWith;
58+
import static org.hamcrest.Matchers.is;
5259

5360
/**
5461
* Tests snapshot operations during disruptions.
@@ -156,6 +163,95 @@ public void clusterChanged(ClusterChangedEvent event) {
156163
assertAllSnapshotsCompleted();
157164
}
158165

166+
public void testDisruptionAfterFinalization() throws Exception {
167+
final String idxName = "test";
168+
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
169+
final String dataNode = internalCluster().startDataOnlyNode();
170+
ensureStableCluster(4);
171+
172+
createRandomIndex(idxName);
173+
174+
logger.info("--> creating repository");
175+
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
176+
.setType("fs").setSettings(Settings.builder()
177+
.put("location", randomRepoPath())
178+
.put("compress", randomBoolean())
179+
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
180+
181+
final String masterNode1 = internalCluster().getMasterName();
182+
Set<String> otherNodes = new HashSet<>(allMasterEligibleNodes);
183+
otherNodes.remove(masterNode1);
184+
otherNodes.add(dataNode);
185+
186+
NetworkDisruption networkDisruption =
187+
new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes),
188+
new NetworkDisruption.NetworkUnresponsive());
189+
internalCluster().setDisruptionScheme(networkDisruption);
190+
191+
ClusterService clusterService = internalCluster().clusterService(masterNode1);
192+
CountDownLatch disruptionStarted = new CountDownLatch(1);
193+
clusterService.addListener(new ClusterStateListener() {
194+
@Override
195+
public void clusterChanged(ClusterChangedEvent event) {
196+
SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE);
197+
if (snapshots != null && snapshots.entries().size() > 0) {
198+
final SnapshotsInProgress.Entry snapshotEntry = snapshots.entries().get(0);
199+
if (snapshotEntry.state() == SnapshotsInProgress.State.SUCCESS) {
200+
final RepositoriesMetaData repoMeta =
201+
event.state().metaData().custom(RepositoriesMetaData.TYPE);
202+
final RepositoryMetaData metaData = repoMeta.repository("test-repo");
203+
if (metaData.generation() == metaData.pendingGeneration()
204+
&& metaData.generation() > snapshotEntry.repositoryStateId()) {
205+
logger.info("--> starting disruption");
206+
networkDisruption.startDisrupting();
207+
clusterService.removeListener(this);
208+
disruptionStarted.countDown();
209+
}
210+
}
211+
}
212+
}
213+
});
214+
215+
final String snapshot = "test-snap";
216+
217+
logger.info("--> starting snapshot");
218+
ActionFuture<CreateSnapshotResponse> future = client(masterNode1).admin().cluster()
219+
.prepareCreateSnapshot("test-repo", snapshot).setWaitForCompletion(true)
220+
.setIndices(idxName).execute();
221+
222+
logger.info("--> waiting for disruption to start");
223+
assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));
224+
225+
assertAllSnapshotsCompleted();
226+
227+
logger.info("--> verify that snapshot was successful or no longer exist");
228+
assertBusy(() -> {
229+
try {
230+
assertSnapshotExists("test-repo", snapshot);
231+
} catch (SnapshotMissingException exception) {
232+
logger.info("--> done verifying, snapshot doesn't exist");
233+
}
234+
}, 1, TimeUnit.MINUTES);
235+
236+
logger.info("--> stopping disrupting");
237+
networkDisruption.stopDisrupting();
238+
ensureStableCluster(4, masterNode1);
239+
logger.info("--> done");
240+
241+
try {
242+
future.get();
243+
fail("Should have failed because the node disconnected from the cluster during snapshot finalization");
244+
} catch (Exception ex) {
245+
final SnapshotException sne = (SnapshotException) ExceptionsHelper.unwrap(ex, SnapshotException.class);
246+
assertNotNull(sne);
247+
assertThat(
248+
sne.getMessage(), either(endsWith(" Failed to remove snapshot from cluster state")).or(endsWith(" no longer master")));
249+
assertThat(sne.getSnapshotName(), is(snapshot));
250+
}
251+
252+
assertAllSnapshotsCompleted();
253+
}
254+
159255
private void assertAllSnapshotsCompleted() throws Exception {
160256
logger.info("--> wait until the snapshot is done");
161257
assertBusy(() -> {

0 commit comments

Comments
 (0)