diff --git a/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/SubscribeToShardIntegrationTest.java b/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/SubscribeToShardIntegrationTest.java index f0775794f588..301cf901c077 100644 --- a/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/SubscribeToShardIntegrationTest.java +++ b/services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/SubscribeToShardIntegrationTest.java @@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.model.ConsumerStatus; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.StreamStatus; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; @@ -290,8 +291,22 @@ private static void waitForStreamToBeActive() { Waiter.run(() -> asyncClient.describeStream(r -> r.streamName(streamName)).join()) .until(b -> b.streamDescription().streamStatus().equals(StreamStatus.ACTIVE)) .orFailAfter(Duration.ofMinutes(5)); - } + // Additional verification to ensure stream is fully operational + Waiter.run(() -> { + try { + asyncClient.listShards(r -> r.streamName(streamName)).join(); + return true; + } catch (Exception e) { + if (e.getCause() instanceof ResourceInUseException) { + return false; + } + throw e; + } + }) + .until(Boolean::booleanValue) + .orFailAfter(Duration.ofMinutes(1)); + } /** * Puts a random record to the stream.