diff --git a/docs/changelog/101184.yaml b/docs/changelog/101184.yaml new file mode 100644 index 0000000000000..ac2f5f3ee8af1 --- /dev/null +++ b/docs/changelog/101184.yaml @@ -0,0 +1,6 @@ +pr: 101184 +summary: More robust timeout for repo analysis +area: Snapshot/Restore +type: bug +issues: + - 101182 diff --git a/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/10_analyze.yml b/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/10_analyze.yml index 6223ca8443b0e..648eb3766fffb 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/10_analyze.yml +++ b/x-pack/plugin/snapshot-repo-test-kit/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/10_analyze.yml @@ -176,4 +176,5 @@ setup: - match: { status: 500 } - match: { error.type: repository_verification_exception } - match: { error.reason: "/.*test_repo_slow..analysis.failed.*/" } - - match: { error.root_cause.0.type: receive_timeout_transport_exception } + - match: { error.root_cause.0.type: repository_verification_exception } + - match: { error.root_cause.0.reason: "/.*test_repo_slow..analysis.timed.out.after..1s.*/" } diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java index e5ba4c2c6930b..84e27cc26b77b 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; @@ -61,6 +62,7 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -347,8 +349,26 @@ public BytesReference onCompareAndExchange(BytesRegister register, BytesReferenc } } - private RepositoryAnalyzeAction.Response analyseRepository(RepositoryAnalyzeAction.Request request) { - return client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS); + public void testTimesOutSpinningRegisterAnalysis() { + final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo"); + request.timeout(TimeValue.timeValueMillis(between(1, 1000))); + + blobStore.setDisruption(new Disruption() { + @Override + public boolean compareAndExchangeReturnsWitness() { + return false; + } + }); + final var exception = expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request)); + assertThat(exception.getMessage(), containsString("analysis failed")); + assertThat( + asInstanceOf(RepositoryVerificationException.class, exception.getCause()).getMessage(), + containsString("analysis timed out") + ); + } + + private void analyseRepository(RepositoryAnalyzeAction.Request request) { + client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS); } private static void assertPurpose(OperationPurpose purpose) { @@ -464,6 +484,10 @@ default boolean createBlobOnAbort() { return false; } + default boolean compareAndExchangeReturnsWitness() { + return true; + } + default BytesReference onCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) { return register.compareAndExchange(expected, updated); } @@ -637,8 +661,12 @@ public void compareAndExchangeRegister( ActionListener listener ) { assertPurpose(purpose); - final var register = registers.computeIfAbsent(key, ignored -> new BytesRegister()); - listener.onResponse(OptionalBytesReference.of(disruption.onCompareAndExchange(register, expected, updated))); + if (disruption.compareAndExchangeReturnsWitness()) { + final var register = registers.computeIfAbsent(key, ignored -> new BytesRegister()); + listener.onResponse(OptionalBytesReference.of(disruption.onCompareAndExchange(register, expected, updated))); + } else { + listener.onResponse(OptionalBytesReference.MISSING); + } } } diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java index 79de8bd7b0248..c49f14ed597fd 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalyzeAction.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersions; import org.elasticsearch.Version; @@ -22,6 +23,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -364,6 +366,7 @@ public static class AsyncAction { private final DiscoveryNodes discoveryNodes; private final LongSupplier currentTimeMillisSupplier; private final ActionListener listener; + private final SubscribableListener cancellationListener; private final long timeoutTimeMillis; // choose the blob path nondeterministically to avoid clashes, assuming that the actual path doesn't matter for reproduction @@ -394,15 +397,24 @@ public AsyncAction( this.discoveryNodes = discoveryNodes; this.currentTimeMillisSupplier = currentTimeMillisSupplier; this.timeoutTimeMillis = currentTimeMillisSupplier.getAsLong() + request.getTimeout().millis(); - this.listener = listener; + + this.cancellationListener = new SubscribableListener<>(); + this.listener = ActionListener.runBefore(listener, () -> cancellationListener.onResponse(null)); responses = new ArrayList<>(request.blobCount); } - private void fail(Exception e) { + private boolean setFirstFailure(Exception e) { if (failure.compareAndSet(null, e)) { transportService.getTaskManager().cancelTaskAndDescendants(task, "task failed", false, ActionListener.noop()); + return true; } else { + return false; + } + } + + private void fail(Exception e) { + if (setFirstFailure(e) == false) { if (innerFailures.tryAcquire()) { final Throwable cause = ExceptionsHelper.unwrapCause(e); if (cause instanceof TaskCancelledException || cause instanceof ReceiveTimeoutTransportException) { @@ -424,24 +436,34 @@ private boolean isRunning() { } if (task.isCancelled()) { - failure.compareAndSet(null, new RepositoryVerificationException(request.repositoryName, "verification cancelled")); + setFirstFailure(new RepositoryVerificationException(request.repositoryName, "verification cancelled")); // if this CAS failed then we're failing for some other reason, nbd; also if the task is cancelled then its descendants are // also cancelled, so no further action is needed either way. return false; } - if (timeoutTimeMillis < currentTimeMillisSupplier.getAsLong()) { - if (failure.compareAndSet( - null, - new RepositoryVerificationException(request.repositoryName, "analysis timed out after [" + request.getTimeout() + "]") - )) { - transportService.getTaskManager().cancelTaskAndDescendants(task, "timed out", false, ActionListener.noop()); - } - // if this CAS failed then we're already failing for some other reason, nbd - return false; + return true; + } + + private class CheckForCancelListener implements ActionListener { + @Override + public void onResponse(Void unused) { + // task complete, nothing to do } - return true; + @Override + public void onFailure(Exception e) { + assert e instanceof ElasticsearchTimeoutException : e; + if (isRunning()) { + // if this CAS fails then we're already failing for some other reason, nbd + setFirstFailure( + new RepositoryVerificationException( + request.repositoryName, + "analysis timed out after [" + request.getTimeout() + "]" + ) + ); + } + } } public void run() { @@ -450,6 +472,9 @@ public void run() { logger.info("running analysis of repository [{}] using path [{}]", request.getRepositoryName(), blobPath); + cancellationListener.addTimeout(request.getTimeout(), repository.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE); + cancellationListener.addListener(new CheckForCancelListener()); + final Random random = new Random(request.getSeed()); final List nodes = getSnapshotNodes(discoveryNodes); @@ -536,7 +561,7 @@ private void runBlobAnalysis(Releasable ref, final BlobAnalyzeAction.Request req BlobAnalyzeAction.NAME, request, task, - TransportRequestOptions.timeout(TimeValue.timeValueMillis(timeoutTimeMillis - currentTimeMillisSupplier.getAsLong())), + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(ActionListener.releaseAfter(new ActionListener<>() { @Override public void onResponse(BlobAnalyzeAction.Response response) {