Skip to content

ISSUE-30: Support ignoring payload when not found in s3 #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR
ReceiveMessageResult receiveMessageResult = super.receiveMessage(receiveMessageRequest);

List<Message> messages = receiveMessageResult.getMessages();
List<Message> messagesToIgnore = new ArrayList<>();
for (Message message : messages) {

// for each received message check if they are stored in S3.
Expand All @@ -357,7 +358,22 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR
String largeMessagePointer = message.getBody();
largeMessagePointer = largeMessagePointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer");

message.setBody(payloadStore.getOriginalPayload(largeMessagePointer));
final String originalBody;
try {
originalBody = payloadStore.getOriginalPayload(largeMessagePointer);
} catch (AmazonServiceException e) {
boolean isNoSuchKeyException = ((AmazonServiceException) e.getCause()).getErrorCode().equals("NoSuchKey");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need casting here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that e.getCause() is Throwable at this point

if (isNoSuchKeyException && clientConfiguration.ignoresPayloadNotFound()) {
deleteMessage(receiveMessageRequest.getQueueUrl(), message.getReceiptHandle());
LOG.warn("SQS message deleted as it could not be found in S3");
messagesToIgnore.add(message);
continue;
}
throw e;
}

// Replace the large message pointer with the original message body
message.setBody(originalBody);

// remove the additional attribute before returning the message
// to user.
Expand All @@ -371,6 +387,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR
message.setReceiptHandle(modifiedReceiptHandle);
}
}
receiveMessageResult.getMessages().removeAll(messagesToIgnore);
return receiveMessageResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ExtendedClientConfiguration extends PayloadStorageConfiguration {

private boolean cleanupS3Payload = true;
private boolean useLegacyReservedAttributeName = true;
private boolean ignorePayloadNotFound = false;

public ExtendedClientConfiguration() {
super();
Expand All @@ -41,6 +42,7 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) {
super(other);
this.cleanupS3Payload = other.doesCleanupS3Payload();
this.useLegacyReservedAttributeName = other.usesLegacyReservedAttributeName();
this.ignorePayloadNotFound = other.ignoresPayloadNotFound();
}

/**
Expand Down Expand Up @@ -100,6 +102,32 @@ public ExtendedClientConfiguration withLegacyReservedAttributeNameDisabled() {
return this;
}

/**
* Sets whether or not messages should be removed from Amazon SQS
* when payloads are not found in Amazon S3.
*
* @param ignorePayloadNotFound
* Whether or not messages should be removed from Amazon SQS
* when payloads are not found in Amazon S3. Default: false
*/
public void setIgnorePayloadNotFound(boolean ignorePayloadNotFound) {
this.ignorePayloadNotFound = ignorePayloadNotFound;
}

/**
* Sets whether or not messages should be removed from Amazon SQS
* when payloads are not found in Amazon S3.
*
* @param ignorePayloadNotFound
* Whether or not messages should be removed from Amazon SQS
* when payloads are not found in Amazon S3. Default: false
* @return the updated ExtendedClientConfiguration object.
*/
public ExtendedClientConfiguration withIgnorePayloadNotFound(boolean ignorePayloadNotFound) {
setIgnorePayloadNotFound(ignorePayloadNotFound);
return this;
}

/**
* Checks whether or not clean up large objects in S3 is enabled.
*
Expand All @@ -121,6 +149,17 @@ public boolean usesLegacyReservedAttributeName() {
return useLegacyReservedAttributeName;
}

/**
* Checks whether or not messages should be removed from Amazon SQS
* when payloads are not found in Amazon S3.
*
* @return True if messages should be removed from Amazon SQS
* when payloads are not found in Amazon S3. Default: false
*/
public boolean ignoresPayloadNotFound() {
return ignorePayloadNotFound;
}

@Override
public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) {
setAlwaysThroughS3(alwaysThroughS3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.*;
import com.amazonaws.services.sqs.AmazonSQS;
Expand All @@ -33,6 +34,7 @@
import software.amazon.payloadoffloading.PayloadS3Pointer;

import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -407,6 +409,55 @@ public void testWhenLargeMessageIsSentThenAttributeWithPayloadSizeIsAdded() {
Assert.assertEquals(messageLength, (int) Integer.valueOf(attributes.get(AmazonSQSExtendedClient.LEGACY_RESERVED_ATTRIBUTE_NAME).getStringValue()));
}

@Test
public void testWhenIgnorePayloadNotFoundIsSentThenNotFoundKeysInS3AreDeletedInSQS() {
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
.withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withIgnorePayloadNotFound(true);

AmazonServiceException mockException = mock(AmazonServiceException.class);
when(mockException.getErrorCode()).thenReturn("NoSuchKey");

Message message = new Message().addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, mock(MessageAttributeValue.class));
String pointer = new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson();
message.setBody(pointer);
message.setReceiptHandle("receipt-handle");

when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(new ReceiveMessageResult().withMessages(message));
doThrow(mockException).when(mockS3).getObject(any(GetObjectRequest.class));

AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration));
ReceiveMessageRequest messageRequest = new ReceiveMessageRequest().withQueueUrl(SQS_QUEUE_URL);
ReceiveMessageResult actualReceiveMessageResult = sqsExtended.receiveMessage(messageRequest);
Assert.assertTrue(actualReceiveMessageResult.getMessages().isEmpty());

ArgumentCaptor<DeleteMessageRequest> deleteMessageRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class);
verify(mockSqsBackend).deleteMessage(deleteMessageRequestArgumentCaptor.capture());
Assert.assertEquals(SQS_QUEUE_URL, deleteMessageRequestArgumentCaptor.getValue().getQueueUrl());
Assert.assertEquals("receipt-handle", deleteMessageRequestArgumentCaptor.getValue().getReceiptHandle());
}

@Test
public void testWhenIgnorePayloadNotFoundIsNotSentThenNotFoundKeysInS3AreNotDeletedInSQS() {
AmazonServiceException mockException = mock(AmazonServiceException.class);
when(mockException.getErrorCode()).thenReturn("NoSuchKey");

Message message = new Message().addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, mock(MessageAttributeValue.class));
String pointer = new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson();
message.setBody(pointer);

when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(new ReceiveMessageResult().withMessages(message));

doThrow(mockException).when(mockS3).getObject(any(GetObjectRequest.class));
ReceiveMessageRequest messageRequest = new ReceiveMessageRequest();

try {
extendedSqsWithDefaultConfig.receiveMessage(messageRequest);
Assert.fail("exception should have been thrown");
} catch (AmazonServiceException e) {
verify(mockSqsBackend, never()).deleteMessage(any(DeleteMessageRequest.class));
}
}

@Test
public void testDefaultExtendedClientDeletesSmallMessage() {
// given
Expand Down