-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathS3AsyncDao.java
118 lines (106 loc) · 4.99 KB
/
S3AsyncDao.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package software.amazon.payloadoffloading;
import java.io.UncheckedIOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
/**
* Dao layer to access S3.
*/
public class S3AsyncDao {
private static final Logger LOG = LoggerFactory.getLogger(S3AsyncDao.class);
private final S3AsyncClient s3Client;
private final ServerSideEncryptionStrategy serverSideEncryptionStrategy;
private final ObjectCannedACL objectCannedACL;
public S3AsyncDao(S3AsyncClient s3Client) {
this(s3Client, null, null);
}
public S3AsyncDao(
S3AsyncClient s3Client,
ServerSideEncryptionStrategy serverSideEncryptionStrategy,
ObjectCannedACL objectCannedACL) {
this.s3Client = s3Client;
this.serverSideEncryptionStrategy = serverSideEncryptionStrategy;
this.objectCannedACL = objectCannedACL;
}
public CompletableFuture<String> getTextFromS3(String s3BucketName, String s3Key) {
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(s3BucketName)
.key(s3Key)
.build();
return s3Client.getObject(getObjectRequest, AsyncResponseTransformer.toBytes())
.thenApply(ResponseBytes::asUtf8String)
.handle((v, tIn) -> {
if (tIn != null) {
Throwable t = Util.unwrapFutureException(tIn);
if (t instanceof SdkException) {
String errorMessage = "Failed to get the S3 object which contains the payload.";
LOG.error(errorMessage, t);
throw SdkException.create(errorMessage, t);
}
if (t instanceof UncheckedIOException) {
String errorMessage = "Failure when handling the message which was read from S3 object.";
LOG.error(errorMessage, t);
throw SdkClientException.create(errorMessage, t);
}
throw new CompletionException(t);
}
return v;
});
}
public CompletableFuture<Void> storeTextInS3(String s3BucketName, String s3Key, String payloadContentStr) {
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(s3BucketName)
.key(s3Key);
if (objectCannedACL != null) {
putObjectRequestBuilder.acl(objectCannedACL);
}
// https://docs.aws.amazon.com/AmazonS3/latest/dev/kms-using-sdks.html
if (serverSideEncryptionStrategy != null) {
serverSideEncryptionStrategy.decorate(putObjectRequestBuilder);
}
return s3Client.putObject(putObjectRequestBuilder.build(), AsyncRequestBody.fromString(payloadContentStr))
.handle((v, tIn) -> {
if (tIn != null) {
Throwable t = Util.unwrapFutureException(tIn);
if (t instanceof SdkException) {
String errorMessage = "Failed to store the message content in an S3 object.";
LOG.error(errorMessage, t);
throw SdkException.create(errorMessage, t);
}
throw new CompletionException(t);
}
return null;
});
}
public CompletableFuture<Void> deletePayloadFromS3(String s3BucketName, String s3Key) {
DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder()
.bucket(s3BucketName)
.key(s3Key)
.build();
return s3Client.deleteObject(deleteObjectRequest)
.handle((v, tIn) -> {
if (tIn != null) {
Throwable t = Util.unwrapFutureException(tIn);
if (t instanceof SdkException) {
String errorMessage = "Failed to delete the S3 object which contains the payload";
LOG.error(errorMessage, t);
throw SdkException.create(errorMessage, t);
}
throw new CompletionException(t);
}
LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
return null;
});
}
}