-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathS3BackedPayloadStoreAsync.java
77 lines (64 loc) · 2.9 KB
/
S3BackedPayloadStoreAsync.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package software.amazon.payloadoffloading;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.payloadoffloading.PayloadS3Pointer;
/**
* S3 based implementation for PayloadStoreAsync.
*/
public class S3BackedPayloadStoreAsync implements PayloadStoreAsync {
private static final Logger LOG = LoggerFactory.getLogger(S3BackedPayloadStoreAsync.class);
private final String s3BucketName;
private final S3AsyncDao s3Dao;
public S3BackedPayloadStoreAsync(S3AsyncDao s3Dao, String s3BucketName) {
this.s3BucketName = s3BucketName;
this.s3Dao = s3Dao;
}
@Override
public CompletableFuture<String> storeOriginalPayload(String payload) {
String s3Key = UUID.randomUUID().toString();
return storeOriginalPayload(payload, s3Key);
}
@Override
public CompletableFuture<String> storeOriginalPayload(String payload, String s3Key) {
return s3Dao.storeTextInS3(s3BucketName, s3Key, payload)
.thenApply(v -> {
LOG.info("S3 object created, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
// Convert S3 pointer (bucket name, key, etc) to JSON string
PayloadS3Pointer s3Pointer = new PayloadS3Pointer(s3BucketName, s3Key);
return s3Pointer.toJson();
});
}
@Override
public CompletableFuture<String> getOriginalPayload(String payloadPointer) {
try {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(payloadPointer);
String s3BucketName = s3Pointer.getS3BucketName();
String s3Key = s3Pointer.getS3Key();
return s3Dao.getTextFromS3(s3BucketName, s3Key)
.thenApply(originalPayload -> {
LOG.info("S3 object read, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
return originalPayload;
});
} catch (Exception e) {
CompletableFuture<String> futureEx = new CompletableFuture<>();
futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e));
return futureEx;
}
}
@Override
public CompletableFuture<Void> deleteOriginalPayload(String payloadPointer) {
try {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(payloadPointer);
String s3BucketName = s3Pointer.getS3BucketName();
String s3Key = s3Pointer.getS3Key();
return s3Dao.deletePayloadFromS3(s3BucketName, s3Key);
} catch (Exception e) {
CompletableFuture<Void> futureEx = new CompletableFuture<>();
futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e));
return futureEx;
}
}
}