Skip to content

Commit b9eb711

Browse files
authored
Merge pull request #2555 from dantengsky/feature-s3-endpoint-url
add endpoint url param for s3 dal
2 parents 9e14a97 + 0fd58fe commit b9eb711

19 files changed

+157
-219
lines changed

.github/workflows/stateless-tests-standalone.yml

+28
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,34 @@ jobs:
4242
env:
4343
CARGO_INCREMENTAL: '0'
4444

45+
- name: Minio Setup for (ubuntu-latest only)
46+
if: matrix.config.os == 'ubuntu-latest'
47+
run: |
48+
docker run -d -p 9000:9000 --name minio \
49+
-e "MINIO_ACCESS_KEY=minioadmin" \
50+
-e "MINIO_SECRET_KEY=minioadmin" \
51+
-v /tmp/data:/data \
52+
-v /tmp/config:/root/.minio \
53+
minio/minio server /data
54+
55+
export AWS_ACCESS_KEY_ID=minioadmin
56+
export AWS_SECRET_ACCESS_KEY=minioadmin
57+
export AWS_EC2_METADATA_DISABLED=true
58+
59+
aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://testbucket
60+
61+
- name: Run Stateless Tests with Standalone mode (ubuntu-latest only)
62+
if: matrix.config.os == 'ubuntu-latest'
63+
run: |
64+
export STORAGE_TYPE=s3
65+
export S3_STORAGE_BUCKET=testbucket
66+
export S3_STORAGE_REGION=us-east-1
67+
export S3_STORAGE_ENDPOINT_URL=http://127.0.0.1:9000
68+
export S3_STORAGE_ACCESS_KEY_ID=minioadmin
69+
export S3_STORAGE_SECRET_ACCESS_KEY=minioadmin
70+
bash ./scripts/ci/ci-run-stateless-tests-standalone.sh
71+
4572
- name: Run Stateless Tests with Standalone mode
73+
if: matrix.config.os != 'ubuntu-latest'
4674
run: |
4775
bash ./scripts/ci/ci-run-stateless-tests-standalone.sh

common/dal/src/data_accessor.rs

-2
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ impl<T> SeekableReader for T where T: Read + Seek {}
3737

3838
#[async_trait::async_trait]
3939
pub trait DataAccessor: Send + Sync {
40-
fn get_reader(&self, path: &str, len: Option<u64>) -> Result<Box<dyn SeekableReader>>;
41-
4240
fn get_input_stream(&self, path: &str, stream_len: Option<u64>) -> Result<InputStream>;
4341

4442
async fn get(&self, path: &str) -> Result<Bytes>;

common/dal/src/impls/aws_s3/s3.rs

+54-35
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ use common_exception::ErrorCode;
2020
use common_exception::Result;
2121
use futures::Stream;
2222
use futures::StreamExt;
23+
use rusoto_core::credential::DefaultCredentialsProvider;
2324
use rusoto_core::credential::StaticProvider;
2425
use rusoto_core::ByteStream;
26+
use rusoto_core::Client;
2527
use rusoto_core::HttpClient;
2628
use rusoto_core::Region;
2729
use rusoto_s3::GetObjectRequest;
@@ -33,54 +35,79 @@ use crate::Bytes;
3335
use crate::DataAccessor;
3436
use crate::InputStream;
3537
use crate::S3InputStream;
36-
use crate::SeekableReader;
3738

3839
pub struct S3 {
3940
client: S3Client,
4041
bucket: String,
4142
}
4243

4344
impl S3 {
44-
#[allow(dead_code)]
45-
pub fn new(region: Region, bucket: String) -> Self {
46-
let client = S3Client::new(region);
47-
S3 { client, bucket }
48-
}
49-
50-
/// build S3 dal with aws credentials
51-
/// for region mapping, see [`rusoto_core::Region`]
52-
pub fn with_credentials(
53-
region: &str,
45+
pub fn try_create(
46+
region_name: &str,
47+
endpoint_url: &str,
5448
bucket: &str,
5549
access_key_id: &str,
5650
secret_accesses_key: &str,
5751
) -> Result<Self> {
58-
let region = Region::from_str(region).map_err(|e| {
59-
ErrorCode::DALTransportError(format!(
60-
"invalid region {}, error details {}",
61-
region,
62-
e.to_string()
63-
))
64-
})?;
65-
let provider = StaticProvider::new(
66-
access_key_id.to_owned(),
67-
secret_accesses_key.to_owned(),
68-
None,
69-
None,
70-
);
71-
let client = HttpClient::new().map_err(|e| {
52+
let region = Self::parse_region(region_name, endpoint_url)?;
53+
54+
let dispatcher = HttpClient::new().map_err(|e| {
7255
ErrorCode::DALTransportError(format!(
7356
"failed to create http client of s3, {}",
7457
e.to_string()
7558
))
7659
})?;
77-
let client = S3Client::new_with(client, provider, region);
60+
61+
let client = match Self::credential_provider(access_key_id, secret_accesses_key) {
62+
Some(provider) => Client::new_with(provider, dispatcher),
63+
None => Client::new_with(
64+
DefaultCredentialsProvider::new().map_err(|e| {
65+
ErrorCode::DALTransportError(format!(
66+
"failed to create default credentials provider, {}",
67+
e.to_string()
68+
))
69+
})?,
70+
dispatcher,
71+
),
72+
};
73+
74+
let s3_client = S3Client::new_with_client(client, region);
7875
Ok(S3 {
79-
client,
76+
client: s3_client,
8077
bucket: bucket.to_owned(),
8178
})
8279
}
8380

81+
fn parse_region(name: &str, endpoint: &str) -> Result<Region> {
82+
if endpoint.is_empty() {
83+
Region::from_str(name).map_err(|e| {
84+
ErrorCode::DALTransportError(format!(
85+
"invalid region {}, error details {}",
86+
name,
87+
e.to_string()
88+
))
89+
})
90+
} else {
91+
Ok(Region::Custom {
92+
name: name.to_string(),
93+
endpoint: endpoint.to_string(),
94+
})
95+
}
96+
}
97+
98+
fn credential_provider(key_id: &str, secret: &str) -> Option<StaticProvider> {
99+
if key_id.is_empty() {
100+
None
101+
} else {
102+
Some(StaticProvider::new(
103+
key_id.to_owned(),
104+
secret.to_owned(),
105+
None,
106+
None,
107+
))
108+
}
109+
}
110+
84111
async fn put_byte_stream(
85112
&self,
86113
path: &str,
@@ -102,14 +129,6 @@ impl S3 {
102129

103130
#[async_trait::async_trait]
104131
impl DataAccessor for S3 {
105-
fn get_reader(
106-
&self,
107-
_path: &str,
108-
_stream_len: Option<u64>,
109-
) -> common_exception::Result<Box<dyn SeekableReader>> {
110-
todo!()
111-
}
112-
113132
fn get_input_stream(
114133
&self,
115134
path: &str,

common/dal/src/impls/aws_s3/s3_input_stream_test.rs

+26-25
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use crate::DataAccessor;
2929
use crate::S3;
3030

3131
struct TestFixture {
32-
region: Region,
32+
region_name: String,
33+
endpoint_url: String,
3334
bucket_name: String,
3435
test_key: String,
3536
content: Vec<u8>,
@@ -39,17 +40,35 @@ impl TestFixture {
3940
fn new(size: usize, key: String) -> Self {
4041
let random_bytes: Vec<u8> = (0..size).map(|_| rand::random::<u8>()).collect();
4142
Self {
42-
region: Region::UsEast2,
43-
bucket_name: "poc-datafuse".to_string(),
43+
region_name: "us-east-1".to_string(),
44+
endpoint_url: "http://localhost:9000".to_string(),
45+
bucket_name: "test-bucket".to_string(),
4446
test_key: key,
4547
content: random_bytes,
4648
}
4749
}
50+
51+
fn region(&self) -> Region {
52+
Region::Custom {
53+
name: self.region_name.clone(),
54+
endpoint: self.endpoint_url.clone(),
55+
}
56+
}
57+
58+
fn data_accessor(&self) -> common_exception::Result<S3> {
59+
S3::try_create(
60+
self.region_name.as_str(),
61+
self.endpoint_url.as_str(),
62+
self.bucket_name.as_str(),
63+
"",
64+
"",
65+
)
66+
}
4867
}
4968

5069
impl TestFixture {
5170
async fn gen_test_obj(&self) -> common_exception::Result<()> {
52-
let rusoto_client = S3Client::new(self.region.clone());
71+
let rusoto_client = S3Client::new(self.region());
5372
let put_req = PutObjectRequest {
5473
bucket: self.bucket_name.clone(),
5574
key: self.test_key.clone(),
@@ -64,47 +83,29 @@ impl TestFixture {
6483
}
6584
}
6685

67-
// CI has no AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID yet
6886
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
6987
#[ignore]
7088
async fn test_s3_input_stream_api() -> common_exception::Result<()> {
7189
let test_key = "test_s3_input_stream".to_string();
7290
let fixture = TestFixture::new(1024 * 10, test_key.clone());
7391
fixture.gen_test_obj().await?;
7492

75-
let s3 = S3::new(fixture.region.clone(), fixture.bucket_name.clone());
93+
let s3 = fixture.data_accessor()?;
7694
let mut input = s3.get_input_stream(&test_key, None)?;
7795
let mut buffer = vec![];
7896
input.read_to_end(&mut buffer).await?;
7997
assert_eq!(fixture.content, buffer);
8098
Ok(())
8199
}
82100

83-
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
84-
#[ignore]
85-
async fn test_s3_cli_with_credentials() -> common_exception::Result<()> {
86-
let test_key = "test_s3_input_stream".to_string();
87-
let fixture = TestFixture::new(1024 * 10, test_key.clone());
88-
fixture.gen_test_obj().await?;
89-
let key = std::env::var("AWS_ACCESS_KEY_ID").unwrap();
90-
let secret = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap();
91-
92-
let s3 = S3::with_credentials(fixture.region.name(), &fixture.bucket_name, &key, &secret)?;
93-
let mut buffer = vec![];
94-
let mut input = s3.get_input_stream(&test_key, None)?;
95-
input.read_to_end(&mut buffer).await?;
96-
assert_eq!(fixture.content, buffer);
97-
Ok(())
98-
}
99-
100101
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
101102
#[ignore]
102103
async fn test_s3_input_stream_seek_api() -> common_exception::Result<()> {
103-
let test_key = "test_s3_seek_stream".to_string();
104+
let test_key = "test_s3_seek_stream_seek".to_string();
104105
let fixture = TestFixture::new(1024 * 10, test_key.clone());
105106
fixture.gen_test_obj().await?;
106107

107-
let s3 = S3::new(fixture.region.clone(), fixture.bucket_name.clone());
108+
let s3 = fixture.data_accessor()?;
108109
let mut input = s3.get_input_stream(&test_key, None)?;
109110
let mut buffer = vec![];
110111
input.seek(SeekFrom::Current(1)).await?;

common/dal/src/impls/azure_blob/azure_blob_accessor.rs

-9
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use crate::AzureBlobInputStream;
2727
use crate::Bytes;
2828
use crate::DataAccessor;
2929
use crate::InputStream;
30-
use crate::SeekableReader;
3130

3231
pub struct AzureBlobAccessor {
3332
client: Arc<StorageClient>,
@@ -102,14 +101,6 @@ impl AzureBlobAccessor {
102101

103102
#[async_trait::async_trait]
104103
impl DataAccessor for AzureBlobAccessor {
105-
fn get_reader(
106-
&self,
107-
_path: &str,
108-
_stream_len: Option<u64>,
109-
) -> common_exception::Result<Box<dyn SeekableReader>> {
110-
todo!()
111-
}
112-
113104
fn get_input_stream(
114105
&self,
115106
path: &str,

common/dal/src/impls/local.rs

-5
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use tokio::io::AsyncWriteExt;
3131
use crate::Bytes;
3232
use crate::DataAccessor;
3333
use crate::InputStream;
34-
use crate::SeekableReader;
3534

3635
pub struct Local {
3736
root: PathBuf,
@@ -65,10 +64,6 @@ impl Local {
6564

6665
#[async_trait::async_trait]
6766
impl DataAccessor for Local {
68-
fn get_reader(&self, path: &str, _len: Option<u64>) -> Result<Box<dyn SeekableReader>> {
69-
Ok(Box::new(std::fs::File::open(path)?))
70-
}
71-
7267
fn get_input_stream(&self, path: &str, _stream_len: Option<u64>) -> Result<InputStream> {
7368
let path = self.prefix_with_root(path)?;
7469
let std_file = std::fs::File::open(path)?;

query/src/configs/config_storage.rs

+16
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub const DISK_STORAGE_DATA_PATH: &str = "DISK_STORAGE_DATA_PATH";
2727

2828
// S3 Storage env.
2929
const S3_STORAGE_REGION: &str = "S3_STORAGE_REGION";
30+
const S3_STORAGE_ENDPOINT_URL: &str = "S3_STORAGE_ENDPOINT_URL";
31+
3032
const S3_STORAGE_ACCESS_KEY_ID: &str = "S3_STORAGE_ACCESS_KEY_ID";
3133
const S3_STORAGE_SECRET_ACCESS_KEY: &str = "S3_STORAGE_SECRET_ACCESS_KEY";
3234
const S3_STORAGE_BUCKET: &str = "S3_STORAGE_BUCKET";
@@ -80,6 +82,10 @@ pub struct S3StorageConfig {
8082
#[serde(default)]
8183
pub region: String,
8284

85+
#[structopt(long, env = S3_STORAGE_ENDPOINT_URL, default_value = "", help = "Endpoint URL for S3 storage")]
86+
#[serde(default)]
87+
pub endpoint_url: String,
88+
8389
#[structopt(long, env = S3_STORAGE_ACCESS_KEY_ID, default_value = "", help = "Access key for S3 storage")]
8490
#[serde(default)]
8591
pub access_key_id: String,
@@ -97,6 +103,7 @@ impl S3StorageConfig {
97103
pub fn default() -> Self {
98104
S3StorageConfig {
99105
region: "".to_string(),
106+
endpoint_url: "".to_string(),
100107
access_key_id: "".to_string(),
101108
secret_access_key: "".to_string(),
102109
bucket: "".to_string(),
@@ -108,6 +115,8 @@ impl fmt::Debug for S3StorageConfig {
108115
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
109116
write!(f, "{{")?;
110117
write!(f, "s3.storage.region: \"{}\", ", self.region)?;
118+
write!(f, "s3.storage.endpoint_url: \"{}\", ", self.endpoint_url)?;
119+
write!(f, "s3.storage.bucket: \"{}\", ", self.bucket)?;
111120
write!(f, "}}")
112121
}
113122
}
@@ -192,6 +201,13 @@ impl StorageConfig {
192201

193202
// S3.
194203
env_helper!(mut_config.storage, s3, region, String, S3_STORAGE_REGION);
204+
env_helper!(
205+
mut_config.storage,
206+
s3,
207+
endpoint_url,
208+
String,
209+
S3_STORAGE_ENDPOINT_URL
210+
);
195211
env_helper!(
196212
mut_config.storage,
197213
s3,

query/src/configs/config_test.rs

+3
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ data_path = \"\"
7979
8080
[storage.s3]
8181
region = \"\"
82+
endpoint_url = \"\"
8283
access_key_id = \"\"
8384
secret_access_key = \"\"
8485
bucket = \"\"
@@ -111,6 +112,7 @@ fn test_env_config() -> Result<()> {
111112
std::env::set_var("STORAGE_TYPE", "s3");
112113
std::env::set_var("DISK_STORAGE_DATA_PATH", "/tmp/test");
113114
std::env::set_var("S3_STORAGE_REGION", "us.region");
115+
std::env::set_var("S3_STORAGE_ENDPOINT_URL", "");
114116
std::env::set_var("S3_STORAGE_ACCESS_KEY_ID", "us.key.id");
115117
std::env::set_var("S3_STORAGE_SECRET_ACCESS_KEY", "us.key");
116118
std::env::set_var("S3_STORAGE_BUCKET", "us.bucket");
@@ -137,6 +139,7 @@ fn test_env_config() -> Result<()> {
137139
assert_eq!("/tmp/test", configured.storage.disk.data_path);
138140

139141
assert_eq!("us.region", configured.storage.s3.region);
142+
assert_eq!("", configured.storage.s3.endpoint_url);
140143
assert_eq!("us.key.id", configured.storage.s3.access_key_id);
141144
assert_eq!("us.key", configured.storage.s3.secret_access_key);
142145
assert_eq!("us.bucket", configured.storage.s3.bucket);

0 commit comments

Comments
 (0)