-
Notifications
You must be signed in to change notification settings - Fork 25.2k
[CI] SnapshotStressTestsIT testRandomActivities failing #108907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Pinging @elastic/es-distributed (Team:Distributed) |
If this assertion trips it would be useful to report these details. Relates elastic#108907
If this assertion trips it would be useful to report these details. Relates #108907
Reproduced for me (once) after a few thousand runs. Curious, because after we fixed #105245 I ran this suite in a loop for ages without any failures, so maybe it's something we've introduced since then. I'm trying again with more logging. |
Perhaps interestingly it looks like there were indeed no failures of this test suite from early Feb (when #105245 was merged) until mid-March: ![]() |
Changed to medium based on previous instances. Feel free to relabel. Also please assign yourself if you're working on it, so others know which issues are free to pick. |
We must not mark the master for shutdown since this triggers a master failover, and we must keep all the blocks in place until the mark/unmark sequence is complete. Also adds some logging around shutdown metadata manipulation. Found while investigating elastic#108907 although this doesn't fix that issue.
…109049) We must not mark the master for shutdown since this triggers a master failover, and we must keep all the blocks in place until the mark/unmark sequence is complete. Also adds some logging around shutdown metadata manipulation. Found while investigating #108907 although this doesn't fix that issue.
Managed to capture another similar failure on a branch with copious logging: testoutput-2024-05-28T04:49:28.436Z.tar.gz
Here's a summary of the affected shard
I wonder if the assertion is wrong? But if so, how is it not tripping more often? Still investigating... |
I spent quite some time investigating this recently. I think the assertion is valid, and I have a hunch that the problem is here: elasticsearch/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java Lines 751 to 754 in bf88da3
If we snapshot an index, then delete and re-create it before the snapshot is finalized, then That said, I've been unable to reproduce this sequence of events in |
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
I spent ages on this last year. It seems to have stopped failing, but not for any particularly good reason AFAICT. Bisection is tricky because it fails so sporadically, but I ran
Given the 5k+ successes on one commit and the approx-1-in-400 failures on the previous, the probability is well over 99.99% that this is the change which matters. But it doesn't make any sense, adba420 simply mutes an unrelated test. |
Science is hard! Still great job on analyzing the probability. TIL |
Ok I've finally managed to reproduce the tripping assertion. It seems to be incredibly delicate, cos I tried hitting this sequence of steps using randomization (for extra coverage and a more general test) and it basically never did so. Click for diffdiff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
index 503ca22b3c31..06f16b85c8f0 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
@@ -194,6 +194,7 @@ import org.elasticsearch.transport.TestTransportChannel;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
+import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -1410,6 +1411,176 @@ public class SnapshotResiliencyTests extends ESTestCase {
safeAwait(testListener); // shouldn't throw
}
+ /**
+ * A device for allowing a test precise control over the order in which shard-snapshot updates are processed by the master. The test
+ * must install the result of {@link #newTransportInterceptor} on the master node and may then call {@link #releaseBlock} as needed to
+ * release blocks on the processing of individual shard snapshot updates.
+ */
+ private static class ShardSnapshotUpdatesSequencer {
+
+ private final Map<String, Map<String, SubscribableListener<Void>>> shardSnapshotUpdatesBlockMap = new HashMap<>();
+
+ private static final SubscribableListener<Void> ALWAYS_PROCEED = SubscribableListener.newSucceeded(null);
+
+ private SubscribableListener<Void> listenerFor(String snapshot, String index) {
+ if ("last-snapshot".equals(snapshot)) {
+ return ALWAYS_PROCEED;
+ }
+
+ return shardSnapshotUpdatesBlockMap
+ //
+ .computeIfAbsent(snapshot, v -> new HashMap<>())
+ .computeIfAbsent(index, v -> new SubscribableListener<>());
+ }
+
+ void releaseBlock(String snapshot, String index) {
+ listenerFor(snapshot, index).onResponse(null);
+ }
+
+ /**
+ * @return a {@link TransportInterceptor} which enforces the sequencing of shard snapshot updates
+ */
+ TransportInterceptor newTransportInterceptor() {
+ return new TransportInterceptor() {
+ @Override
+ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
+ String action,
+ Executor executor,
+ boolean forceExecution,
+ TransportRequestHandler<T> actualHandler
+ ) {
+ if (action.equals(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME)) {
+ return (request, channel, task) -> ActionListener.run(
+ ActionTestUtils.<TransportResponse>assertNoFailureListener(new ChannelActionListener<>(channel)::onResponse),
+ l -> {
+ final var updateRequest = asInstanceOf(UpdateIndexShardSnapshotStatusRequest.class, request);
+ listenerFor(updateRequest.snapshot().getSnapshotId().getName(), updateRequest.shardId().getIndexName()).<
+ TransportResponse>andThen(
+ ll -> actualHandler.messageReceived(request, new TestTransportChannel(ll), task)
+ ).addListener(l);
+ }
+ );
+ } else {
+ return actualHandler;
+ }
+ }
+ };
+ }
+ }
+
+ public void testDeleteIndexBetweenSuccessAndFinalization() {
+
+ final var sequencer = new ShardSnapshotUpdatesSequencer();
+
+ setupTestCluster(
+ 1,
+ 1,
+ node -> node.isMasterNode() ? sequencer.newTransportInterceptor() : TransportService.NOOP_TRANSPORT_INTERCEPTOR
+ );
+
+ final var masterNode = testClusterNodes.randomMasterNodeSafe();
+ final var client = masterNode.client;
+ final var masterClusterService = masterNode.clusterService;
+
+ final var snapshotCount = between(3, 5);
+ final var indices = IntStream.range(0, snapshotCount + 1).mapToObj(i -> "index-" + i).toList();
+ final var repoName = "repo";
+ final var indexToDelete = "index-" + snapshotCount;
+
+ var testListener = SubscribableListener
+
+ // Create the repo and indices
+ .<Void>newForked(stepListener -> {
+ try (var listeners = new RefCountingListener(stepListener)) {
+ client().admin()
+ .cluster()
+ .preparePutRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName)
+ .setType(FsRepository.TYPE)
+ .setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
+ .execute(listeners.acquire(createRepoResponse -> {}));
+
+ for (final var index : indices) {
+ client.admin()
+ .indices()
+ .create(
+ new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(1)),
+ listeners.acquire(createIndexResponse -> {})
+ );
+ }
+ }
+ });
+
+ // Start some snapshots such that snapshot-{i} contains index-{i} and index-{snapshotCount} so that we can control the order in
+ // which they finalize by controlling the order in which the shard snapshot updates are processed
+ for (int i = 0; i < snapshotCount; i++) {
+ final var snapshotName = "snapshot-" + i;
+ final var indexName = "index-" + i;
+ testListener = testListener.andThen(
+ stepListener -> client.admin()
+ .cluster()
+ .prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repoName, snapshotName)
+ .setIndices(indexName, indexToDelete)
+ .setPartial(true)
+ .execute(stepListener.map(createSnapshotResponse -> null))
+ );
+ }
+
+ testListener = testListener
+
+ // wait for the target index to complete in snapshot-1
+ .andThen(l -> {
+ sequencer.releaseBlock("snapshot-0", indexToDelete);
+ sequencer.releaseBlock("snapshot-1", indexToDelete);
+
+ ClusterServiceUtils.addTemporaryStateListener(
+ masterClusterService,
+ clusterState -> SnapshotsInProgress.get(clusterState)
+ .asStream()
+ .mapToLong(
+ e -> e.shards()
+ .entrySet()
+ .stream()
+ .filter(
+ e2 -> e2.getKey().getIndexName().equals(indexToDelete)
+ && e2.getValue().state() == SnapshotsInProgress.ShardState.SUCCESS
+ )
+ .count()
+ )
+ .sum() == 2
+ ).addListener(l.map(v -> null));
+ })
+
+ // delete the target index
+ .andThen(l -> client.admin().indices().delete(new DeleteIndexRequest(indexToDelete), l.map(acknowledgedResponse -> null)))
+
+ // wait for snapshot-1 to complete
+ .andThen(l -> {
+ sequencer.releaseBlock("snapshot-1", "index-1");
+ ClusterServiceUtils.addTemporaryStateListener(
+ masterClusterService,
+ cs -> SnapshotsInProgress.get(cs).asStream().noneMatch(e -> e.snapshot().getSnapshotId().getName().equals("snapshot-1"))
+ ).addListener(l.map(v -> null));
+ })
+
+ // wait for all the other snapshots to complete
+ .andThen(l -> {
+ for (int i = 0; i < snapshotCount; i++) {
+ sequencer.releaseBlock("snapshot-" + i, indexToDelete);
+ sequencer.releaseBlock("snapshot-" + i, "index-" + i);
+ }
+ ClusterServiceUtils.addTemporaryStateListener(masterClusterService, cs -> SnapshotsInProgress.get(cs).isEmpty())
+ .addListener(l.map(v -> null));
+ });
+
+ deterministicTaskQueue.runAllRunnableTasks();
+ assertTrue(
+ "executed all runnable tasks but test steps are still incomplete: "
+ + Strings.toString(SnapshotsInProgress.get(masterClusterService.state()), true, true),
+ testListener.isDone()
+ );
+ safeAwait(testListener); // shouldn't throw
+ }
+
@TestLogging(reason = "testing logging at INFO level", value = "org.elasticsearch.snapshots.SnapshotsService:INFO")
public void testFullSnapshotUnassignedShards() {
setupTestCluster(1, 0); // no data nodes, we want unassigned shards
The comment above is close to right, except it doesn't need us to recreate the index for it to be a problem. It's just that if the index is deleted before the snapshot finalization then we omit its shard generation from |
Unfortunately that doesn't work if the earlier snapshot operation is a clone, because clones are unaffected by index deletes. Instead, the best fix looks to be to finalize these shards properly even if failed rather than simply bailing out here: elasticsearch/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java Lines 753 to 756 in aac1409
Then we need to handle elsewhere that the shards belong to an index that was concurrently deleted so that we don't regress on #50234 - we still have to avoid trying to store the More concretely, considering the following sequence of states for a problematic shard (where columns are separate snapshots, rows are cluster state versions, and each cell shows the
The last row is the bad one, we're finalizing snapshot |
Build scan:
https://gradle-enterprise.elastic.co/s/vgv2xu3qpq5qk/tests/:server:internalClusterTest/org.elasticsearch.snapshots.SnapshotStressTestsIT/testRandomActivities
Reproduction line:
Applicable branches:
main
Reproduces locally?:
Didn't try
Failure history:
Failure dashboard for
org.elasticsearch.snapshots.SnapshotStressTestsIT#testRandomActivities
Failure excerpt:
The text was updated successfully, but these errors were encountered: