Skip to content

Commit bc914b7

Browse files
committed
wasi-blobstore
Signed-off-by: itowlson <[email protected]>
1 parent 7a11e82 commit bc914b7

File tree

47 files changed

+4378
-237
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+4378
-237
lines changed

Cargo.lock

+616-106
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+7-1
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ members = [
118118
[workspace.dependencies]
119119
anyhow = "1"
120120
async-trait = "0.1"
121+
azure_core = "0.21.0"
122+
azure_data_cosmos = "0.21.0"
123+
azure_identity = "0.21.0"
124+
azure_security_keyvault = "0.21.0"
125+
azure_storage = "0.21.0"
126+
azure_storage_blobs = "0.21.0"
121127
bytes = "1"
122128
conformance-tests = { git = "https://github.com/fermyon/conformance-tests", rev = "ecd22a45bcc5c775a56c67689a89aa4005866ac0" }
123129
dirs = "5.0"
@@ -141,7 +147,7 @@ sha2 = "0.10"
141147
tempfile = "3"
142148
test-environment = { git = "https://github.com/fermyon/conformance-tests", rev = "ecd22a45bcc5c775a56c67689a89aa4005866ac0" }
143149
thiserror = "1"
144-
tokio = "1"
150+
tokio = "1.40"
145151
toml = "0.8"
146152
tracing = { version = "0.1", features = ["log"] }
147153
tracing-opentelemetry = { version = "0.29", default-features = false, features = ["metrics"] }

crates/blobstore-azure/Cargo.toml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
[package]
2+
name = "spin-blobstore-azure"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
license.workspace = true
7+
homepage.workspace = true
8+
repository.workspace = true
9+
rust-version.workspace = true
10+
11+
[dependencies]
12+
anyhow = { workspace = true }
13+
azure_core = { workspace = true }
14+
azure_storage = { workspace = true }
15+
azure_storage_blobs = { workspace = true }
16+
futures = { workspace = true }
17+
serde = { workspace = true }
18+
spin-core = { path = "../core" }
19+
spin-factor-blobstore = { path = "../factor-blobstore" }
20+
tokio = { workspace = true }
21+
tokio-stream = "0.1.16"
22+
tokio-util = { version = "0.7.12", features = ["compat"] }
23+
uuid = { version = "1.0", features = ["v4"] }
24+
wasmtime-wasi = { workspace = true }
25+
26+
[lints]
27+
workspace = true

crates/blobstore-azure/src/lib.rs

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
mod store;
2+
3+
use serde::Deserialize;
4+
use spin_factor_blobstore::runtime_config::spin::MakeBlobStore;
5+
use store::{
6+
auth::{AzureBlobAuthOptions, AzureKeyAuth},
7+
AzureContainerManager,
8+
};
9+
10+
/// A key-value store that uses Azure Cosmos as the backend.
11+
#[derive(Default)]
12+
pub struct AzureBlobStoreBuilder {
13+
_priv: (),
14+
}
15+
16+
impl AzureBlobStoreBuilder {
17+
/// Creates a new `AzureBlobStoreBuilder`.
18+
pub fn new() -> Self {
19+
Self::default()
20+
}
21+
}
22+
23+
/// Runtime configuration for the Azure Cosmos key-value store.
24+
#[derive(Deserialize)]
25+
pub struct AzureBlobStoreRuntimeConfig {
26+
/// The authorization token for the Azure blob storage account.
27+
key: Option<String>,
28+
/// The Azure blob storage account name.
29+
account: String,
30+
}
31+
32+
impl MakeBlobStore for AzureBlobStoreBuilder {
33+
const RUNTIME_CONFIG_TYPE: &'static str = "azure_blob";
34+
35+
type RuntimeConfig = AzureBlobStoreRuntimeConfig;
36+
37+
type ContainerManager = AzureContainerManager;
38+
39+
fn make_store(
40+
&self,
41+
runtime_config: Self::RuntimeConfig,
42+
) -> anyhow::Result<Self::ContainerManager> {
43+
let auth = match &runtime_config.key {
44+
Some(key) => AzureBlobAuthOptions::AccountKey(AzureKeyAuth::new(
45+
runtime_config.account.clone(),
46+
key.clone(),
47+
)),
48+
None => AzureBlobAuthOptions::Environmental,
49+
};
50+
51+
let blob_store = AzureContainerManager::new(auth)?;
52+
Ok(blob_store)
53+
}
54+
}

crates/blobstore-azure/src/store.rs

+212
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::Result;
4+
use azure_storage_blobs::prelude::{BlobServiceClient, ContainerClient};
5+
use spin_core::async_trait;
6+
use spin_factor_blobstore::{Container, ContainerManager, Error};
7+
8+
pub mod auth;
9+
mod incoming_data;
10+
mod object_names;
11+
12+
use auth::AzureBlobAuthOptions;
13+
use incoming_data::AzureIncomingData;
14+
use object_names::AzureObjectNames;
15+
16+
pub struct AzureContainerManager {
17+
client: BlobServiceClient,
18+
}
19+
20+
impl AzureContainerManager {
21+
pub fn new(auth_options: AzureBlobAuthOptions) -> Result<Self> {
22+
let (account, credentials) = match auth_options {
23+
AzureBlobAuthOptions::AccountKey(config) => (
24+
config.account.clone(),
25+
azure_storage::StorageCredentials::access_key(&config.account, config.key.clone()),
26+
),
27+
AzureBlobAuthOptions::Environmental => {
28+
let account = std::env::var("STORAGE_ACCOUNT").expect("missing STORAGE_ACCOUNT");
29+
let access_key =
30+
std::env::var("STORAGE_ACCESS_KEY").expect("missing STORAGE_ACCOUNT_KEY");
31+
(
32+
account.clone(),
33+
azure_storage::StorageCredentials::access_key(account, access_key),
34+
)
35+
}
36+
};
37+
38+
let client = azure_storage_blobs::prelude::ClientBuilder::new(account, credentials)
39+
.blob_service_client();
40+
Ok(Self { client })
41+
}
42+
}
43+
44+
#[async_trait]
45+
impl ContainerManager for AzureContainerManager {
46+
async fn get(&self, name: &str) -> Result<Arc<dyn Container>, Error> {
47+
Ok(Arc::new(AzureContainer {
48+
_label: name.to_owned(),
49+
client: self.client.container_client(name),
50+
}))
51+
}
52+
53+
fn is_defined(&self, _store_name: &str) -> bool {
54+
true
55+
}
56+
}
57+
58+
struct AzureContainer {
59+
_label: String,
60+
client: ContainerClient,
61+
}
62+
63+
/// Azure doesn't provide us with a container creation time
64+
const DUMMY_CREATED_AT: u64 = 0;
65+
66+
#[async_trait]
67+
impl Container for AzureContainer {
68+
async fn exists(&self) -> anyhow::Result<bool> {
69+
Ok(self.client.exists().await?)
70+
}
71+
72+
async fn name(&self) -> String {
73+
self.client.container_name().to_owned()
74+
}
75+
76+
async fn info(&self) -> anyhow::Result<spin_factor_blobstore::ContainerMetadata> {
77+
let properties = self.client.get_properties().await?;
78+
Ok(spin_factor_blobstore::ContainerMetadata {
79+
name: properties.container.name,
80+
created_at: DUMMY_CREATED_AT,
81+
})
82+
}
83+
84+
async fn clear(&self) -> anyhow::Result<()> {
85+
anyhow::bail!("Azure blob storage does not support clearing containers")
86+
}
87+
88+
async fn delete_object(&self, name: &str) -> anyhow::Result<()> {
89+
self.client.blob_client(name).delete().await?;
90+
Ok(())
91+
}
92+
93+
async fn delete_objects(&self, names: &[String]) -> anyhow::Result<()> {
94+
// TODO: are atomic semantics required? or efficiency guarantees?
95+
let futures = names.iter().map(|name| self.delete_object(name));
96+
futures::future::try_join_all(futures).await?;
97+
Ok(())
98+
}
99+
100+
async fn has_object(&self, name: &str) -> anyhow::Result<bool> {
101+
Ok(self.client.blob_client(name).exists().await?)
102+
}
103+
104+
async fn object_info(
105+
&self,
106+
name: &str,
107+
) -> anyhow::Result<spin_factor_blobstore::ObjectMetadata> {
108+
let response = self.client.blob_client(name).get_properties().await?;
109+
Ok(spin_factor_blobstore::ObjectMetadata {
110+
name: name.to_string(),
111+
container: self.client.container_name().to_string(),
112+
created_at: response
113+
.blob
114+
.properties
115+
.creation_time
116+
.unix_timestamp()
117+
.try_into()
118+
.unwrap(),
119+
size: response.blob.properties.content_length,
120+
})
121+
}
122+
123+
async fn get_data(
124+
&self,
125+
name: &str,
126+
start: u64,
127+
end: u64,
128+
) -> anyhow::Result<Box<dyn spin_factor_blobstore::IncomingData>> {
129+
// We can't use a Rust range because the Azure type does not accept inclusive ranges,
130+
// and we don't want to add 1 to `end` if it's already at MAX!
131+
let range = if end == u64::MAX {
132+
azure_core::request_options::Range::RangeFrom(start..)
133+
} else {
134+
azure_core::request_options::Range::Range(start..(end + 1))
135+
};
136+
let client = self.client.blob_client(name);
137+
Ok(Box::new(AzureIncomingData::new(client, range)))
138+
}
139+
140+
async fn write_data(
141+
&self,
142+
name: &str,
143+
data: tokio::io::ReadHalf<tokio::io::SimplexStream>,
144+
finished_tx: tokio::sync::mpsc::Sender<anyhow::Result<()>>,
145+
) -> anyhow::Result<()> {
146+
let client = self.client.blob_client(name);
147+
148+
tokio::spawn(async move {
149+
let write_result = Self::write_data_core(data, client).await;
150+
finished_tx
151+
.send(write_result)
152+
.await
153+
.expect("should sent finish tx");
154+
});
155+
156+
Ok(())
157+
}
158+
159+
async fn list_objects(&self) -> anyhow::Result<Box<dyn spin_factor_blobstore::ObjectNames>> {
160+
let stm = self.client.list_blobs().into_stream();
161+
Ok(Box::new(AzureObjectNames::new(stm)))
162+
}
163+
}
164+
165+
impl AzureContainer {
166+
async fn write_data_core(
167+
mut data: tokio::io::ReadHalf<tokio::io::SimplexStream>,
168+
client: azure_storage_blobs::prelude::BlobClient,
169+
) -> anyhow::Result<()> {
170+
use tokio::io::AsyncReadExt;
171+
172+
// Azure limits us to 50k blocks per blob. At 2MB/block that allows 100GB, which will be
173+
// enough for most use cases. If users need flexibility for larger blobs, we could make
174+
// the block size configurable via the runtime config ("size hint" or something).
175+
const BLOCK_SIZE: usize = 2 * 1024 * 1024;
176+
177+
let mut blocks = vec![];
178+
179+
'put_blocks: loop {
180+
let mut bytes = Vec::with_capacity(BLOCK_SIZE);
181+
loop {
182+
let read = data.read_buf(&mut bytes).await?;
183+
let len = bytes.len();
184+
185+
if read == 0 {
186+
// end of stream - send the last block and go
187+
let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec();
188+
let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes);
189+
client.put_block(block_id.clone(), bytes).await?;
190+
blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted(
191+
block_id,
192+
));
193+
break 'put_blocks;
194+
}
195+
if len >= BLOCK_SIZE {
196+
let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec();
197+
let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes);
198+
client.put_block(block_id.clone(), bytes).await?;
199+
blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted(
200+
block_id,
201+
));
202+
break;
203+
}
204+
}
205+
}
206+
207+
let block_list = azure_storage_blobs::blob::BlockList { blocks };
208+
client.put_block_list(block_list).await?;
209+
210+
Ok(())
211+
}
212+
}
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/// Azure blob storage runtime config literal options for authentication
2+
#[derive(Clone, Debug)]
3+
pub struct AzureKeyAuth {
4+
pub account: String,
5+
pub key: String,
6+
}
7+
8+
impl AzureKeyAuth {
9+
pub fn new(account: String, key: String) -> Self {
10+
Self { account, key }
11+
}
12+
}
13+
14+
/// Azure blob storage enumeration for the possible authentication options
15+
#[derive(Clone, Debug)]
16+
pub enum AzureBlobAuthOptions {
17+
/// The account and key have been specified directly
18+
AccountKey(AzureKeyAuth),
19+
/// Spin should use the environment variables of the process to
20+
/// create the StorageCredentials for the storage client. For now this uses old school credentials:
21+
///
22+
/// STORAGE_ACCOUNT
23+
/// STORAGE_ACCESS_KEY
24+
///
25+
/// TODO: Thorsten pls make this proper with *hand waving* managed identity and stuff!
26+
Environmental,
27+
}

0 commit comments

Comments
 (0)