diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java index da7589d..452fa8a 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java @@ -349,6 +349,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR ReceiveMessageResult receiveMessageResult = super.receiveMessage(receiveMessageRequest); List messages = receiveMessageResult.getMessages(); + List messagesToIgnore = new ArrayList<>(); for (Message message : messages) { // for each received message check if they are stored in S3. @@ -357,7 +358,22 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR String largeMessagePointer = message.getBody(); largeMessagePointer = largeMessagePointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer"); - message.setBody(payloadStore.getOriginalPayload(largeMessagePointer)); + final String originalBody; + try { + originalBody = payloadStore.getOriginalPayload(largeMessagePointer); + } catch (AmazonServiceException e) { + boolean isNoSuchKeyException = ((AmazonServiceException) e.getCause()).getErrorCode().equals("NoSuchKey"); + if (isNoSuchKeyException && clientConfiguration.ignoresPayloadNotFound()) { + deleteMessage(receiveMessageRequest.getQueueUrl(), message.getReceiptHandle()); + LOG.warn("SQS message deleted as it could not be found in S3"); + messagesToIgnore.add(message); + continue; + } + throw e; + } + + // Replace the large message pointer with the original message body + message.setBody(originalBody); // remove the additional attribute before returning the message // to user. @@ -371,6 +387,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR message.setReceiptHandle(modifiedReceiptHandle); } } + receiveMessageResult.getMessages().removeAll(messagesToIgnore); return receiveMessageResult; } diff --git a/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java b/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java index cde665b..a20f561 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java +++ b/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java @@ -31,6 +31,7 @@ public class ExtendedClientConfiguration extends PayloadStorageConfiguration { private boolean cleanupS3Payload = true; private boolean useLegacyReservedAttributeName = true; + private boolean ignorePayloadNotFound = false; public ExtendedClientConfiguration() { super(); @@ -41,6 +42,7 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) { super(other); this.cleanupS3Payload = other.doesCleanupS3Payload(); this.useLegacyReservedAttributeName = other.usesLegacyReservedAttributeName(); + this.ignorePayloadNotFound = other.ignoresPayloadNotFound(); } /** @@ -100,6 +102,32 @@ public ExtendedClientConfiguration withLegacyReservedAttributeNameDisabled() { return this; } + /** + * Sets whether or not messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. + * + * @param ignorePayloadNotFound + * Whether or not messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. Default: false + */ + public void setIgnorePayloadNotFound(boolean ignorePayloadNotFound) { + this.ignorePayloadNotFound = ignorePayloadNotFound; + } + + /** + * Sets whether or not messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. + * + * @param ignorePayloadNotFound + * Whether or not messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. Default: false + * @return the updated ExtendedClientConfiguration object. + */ + public ExtendedClientConfiguration withIgnorePayloadNotFound(boolean ignorePayloadNotFound) { + setIgnorePayloadNotFound(ignorePayloadNotFound); + return this; + } + /** * Checks whether or not clean up large objects in S3 is enabled. * @@ -121,6 +149,17 @@ public boolean usesLegacyReservedAttributeName() { return useLegacyReservedAttributeName; } + /** + * Checks whether or not messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. + * + * @return True if messages should be removed from Amazon SQS + * when payloads are not found in Amazon S3. Default: false + */ + public boolean ignoresPayloadNotFound() { + return ignorePayloadNotFound; + } + @Override public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) { setAlwaysThroughS3(alwaysThroughS3); diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java index 2ecc086..0975183 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java @@ -19,6 +19,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.*; import com.amazonaws.services.sqs.AmazonSQS; @@ -33,6 +34,7 @@ import software.amazon.payloadoffloading.PayloadS3Pointer; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.isA; import static org.mockito.Mockito.when; @@ -407,6 +409,55 @@ public void testWhenLargeMessageIsSentThenAttributeWithPayloadSizeIsAdded() { Assert.assertEquals(messageLength, (int) Integer.valueOf(attributes.get(AmazonSQSExtendedClient.LEGACY_RESERVED_ATTRIBUTE_NAME).getStringValue())); } + @Test + public void testWhenIgnorePayloadNotFoundIsSentThenNotFoundKeysInS3AreDeletedInSQS() { + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withIgnorePayloadNotFound(true); + + AmazonServiceException mockException = mock(AmazonServiceException.class); + when(mockException.getErrorCode()).thenReturn("NoSuchKey"); + + Message message = new Message().addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, mock(MessageAttributeValue.class)); + String pointer = new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson(); + message.setBody(pointer); + message.setReceiptHandle("receipt-handle"); + + when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(new ReceiveMessageResult().withMessages(message)); + doThrow(mockException).when(mockS3).getObject(any(GetObjectRequest.class)); + + AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); + ReceiveMessageRequest messageRequest = new ReceiveMessageRequest().withQueueUrl(SQS_QUEUE_URL); + ReceiveMessageResult actualReceiveMessageResult = sqsExtended.receiveMessage(messageRequest); + Assert.assertTrue(actualReceiveMessageResult.getMessages().isEmpty()); + + ArgumentCaptor deleteMessageRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class); + verify(mockSqsBackend).deleteMessage(deleteMessageRequestArgumentCaptor.capture()); + Assert.assertEquals(SQS_QUEUE_URL, deleteMessageRequestArgumentCaptor.getValue().getQueueUrl()); + Assert.assertEquals("receipt-handle", deleteMessageRequestArgumentCaptor.getValue().getReceiptHandle()); + } + + @Test + public void testWhenIgnorePayloadNotFoundIsNotSentThenNotFoundKeysInS3AreNotDeletedInSQS() { + AmazonServiceException mockException = mock(AmazonServiceException.class); + when(mockException.getErrorCode()).thenReturn("NoSuchKey"); + + Message message = new Message().addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, mock(MessageAttributeValue.class)); + String pointer = new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson(); + message.setBody(pointer); + + when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(new ReceiveMessageResult().withMessages(message)); + + doThrow(mockException).when(mockS3).getObject(any(GetObjectRequest.class)); + ReceiveMessageRequest messageRequest = new ReceiveMessageRequest(); + + try { + extendedSqsWithDefaultConfig.receiveMessage(messageRequest); + Assert.fail("exception should have been thrown"); + } catch (AmazonServiceException e) { + verify(mockSqsBackend, never()).deleteMessage(any(DeleteMessageRequest.class)); + } + } + @Test public void testDefaultExtendedClientDeletesSmallMessage() { // given