Skip to content

Commit 95b2f9a

Browse files
committed
fix(s3stream): Set default aws provider as fallback provider
- (AutoMQ#2026)
1 parent c41faff commit 95b2f9a

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

s3stream/src/main/java/com/automq/stream/s3/operator/AwsObjectStorage.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
4444
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
4545
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
46+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
4647
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
4748
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
4849
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
@@ -91,6 +92,7 @@ public class AwsObjectStorage extends AbstractObjectStorage {
9192
public static final String AUTH_TYPE_KEY = "authType";
9293
public static final String STATIC_AUTH_TYPE = "static";
9394
public static final String INSTANCE_AUTH_TYPE = "instance";
95+
public static final String DEFAULT_AUTH_TYPE = "default";
9496
public static final String CHECKSUM_ALGORITHM_KEY = "checksumAlgorithm";
9597

9698
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
@@ -372,24 +374,32 @@ protected DeleteObjectsAccumulator newDeleteObjectsAccumulator() {
372374
}
373375

374376
protected List<AwsCredentialsProvider> credentialsProviders() {
375-
String authType = bucketURI.extensionString(AUTH_TYPE_KEY, STATIC_AUTH_TYPE);
377+
String authType = bucketURI.extensionString(AUTH_TYPE_KEY, DEFAULT_AUTH_TYPE);
376378
switch (authType) {
377379
case STATIC_AUTH_TYPE: {
378-
String accessKey = bucketURI.extensionString(BucketURI.ACCESS_KEY_KEY, System.getenv("KAFKA_S3_ACCESS_KEY"));
379-
String secretKey = bucketURI.extensionString(BucketURI.SECRET_KEY_KEY, System.getenv("KAFKA_S3_SECRET_KEY"));
380-
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
381-
return Collections.emptyList();
382-
}
383-
return List.of(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
380+
AwsCredentialsProvider acp = staticProfileCredentialsProvider();
381+
return acp != null ? List.of(acp) : Collections.emptyList();
384382
}
385383
case INSTANCE_AUTH_TYPE: {
386384
return List.of(instanceProfileCredentialsProvider());
387385
}
386+
case DEFAULT_AUTH_TYPE: {
387+
return List.of(DefaultCredentialsProvider.create());
388+
}
388389
default:
389390
throw new UnsupportedOperationException("Unsupported auth type: " + authType);
390391
}
391392
}
392393

394+
protected AwsCredentialsProvider staticProfileCredentialsProvider() {
395+
String accessKey = bucketURI.extensionString(BucketURI.ACCESS_KEY_KEY, System.getenv("KAFKA_S3_ACCESS_KEY"));
396+
String secretKey = bucketURI.extensionString(BucketURI.SECRET_KEY_KEY, System.getenv("KAFKA_S3_SECRET_KEY"));
397+
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
398+
return null;
399+
}
400+
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
401+
}
402+
393403
protected AwsCredentialsProvider instanceProfileCredentialsProvider() {
394404
if (instanceProfileCredentialsProvider == null) {
395405
synchronized (AwsObjectStorage.class) {
@@ -438,7 +448,6 @@ protected ClientOverrideConfiguration clientOverrideConfiguration() {
438448
private AwsCredentialsProvider newCredentialsProviderChain(List<AwsCredentialsProvider> credentialsProviders) {
439449
List<AwsCredentialsProvider> providers = new ArrayList<>(credentialsProviders);
440450
// Add default providers to the end of the chain
441-
providers.add(InstanceProfileCredentialsProvider.create());
442451
providers.add(AnonymousCredentialsProvider.create());
443452
return AwsCredentialsProviderChain.builder()
444453
.reuseLastProviderEnabled(true)

0 commit comments

Comments
 (0)