Skip to content

Commit d309988

Browse files
committed
add pushing blob from a bytes stream
1 parent c60075f commit d309988

File tree

1 file changed

+63
-2
lines changed

1 file changed

+63
-2
lines changed

src/client.rs

+63-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ use crate::Reference;
1919
use crate::errors::{OciDistributionError, Result};
2020
use crate::token_cache::{RegistryOperation, RegistryToken, RegistryTokenType, TokenCache};
2121
use futures_util::future;
22-
use futures_util::stream::{self, StreamExt, TryStreamExt};
23-
use futures_util::Stream;
22+
use futures_util::stream::{self, Stream, StreamExt, TryStreamExt};
2423
use http::HeaderValue;
2524
use http_auth::{parser::ChallengeParser, ChallengeRef};
2625
use olpc_cjson::CanonicalFormatter;
@@ -492,6 +491,35 @@ impl Client {
492491
.await
493492
}
494493

494+
/// Pushes a blob to the registry as a stream, chunking it
495+
/// upstream.
496+
pub async fn push_blob_stream(
497+
&self,
498+
image: &Reference,
499+
mut blob_stream: impl Stream<Item = std::result::Result<bytes::Bytes, std::io::Error>>
500+
+ std::marker::Unpin,
501+
blob_digest: &str,
502+
) -> Result<String> {
503+
let mut location = self.begin_push_chunked_session(image).await?;
504+
let mut start: usize = 0;
505+
let mut blob_data = Vec::new();
506+
let mut done: bool = false;
507+
loop {
508+
if let Some(bytes) = blob_stream.next().await {
509+
blob_data.extend(&bytes?);
510+
} else {
511+
done = true;
512+
}
513+
// gonna break when push chunk finishes
514+
(location, start) = self.push_chunk(&location, image, &blob_data, start).await?;
515+
if done && start >= blob_data.len() {
516+
break;
517+
}
518+
}
519+
self.end_push_chunked_session(&location, image, blob_digest)
520+
.await
521+
}
522+
495523
/// Perform an OAuth v2 auth request if necessary.
496524
///
497525
/// This performs authorization and then stores the token internally to be used
@@ -2457,6 +2485,39 @@ mod test {
24572485
assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
24582486
}
24592487

2488+
#[tokio::test]
2489+
#[cfg(feature = "test-registry")]
2490+
async fn can_push_stream() {
2491+
let docker = clients::Cli::default();
2492+
let test_container = docker.run(registry_image());
2493+
let port = test_container.get_host_port_ipv4(5000);
2494+
2495+
let c = Client::new(ClientConfig {
2496+
protocol: ClientProtocol::Http,
2497+
..Default::default()
2498+
});
2499+
let url = format!("localhost:{}/hello-wasm:v1", port);
2500+
let image: Reference = url.parse().unwrap();
2501+
2502+
c.auth(&image, &RegistryAuth::Anonymous, RegistryOperation::Push)
2503+
.await
2504+
.expect("result from auth request");
2505+
2506+
let image_data: Vec<Vec<u8>> = vec![b"iamawebassemblymodule".to_vec()];
2507+
let digest = sha256_digest(&image_data[0]);
2508+
let layer_location = c
2509+
.push_blob_stream(
2510+
&image,
2511+
stream::iter(image_data)
2512+
.map(|chunk| Ok::<_, std::io::Error>(bytes::Bytes::from(chunk))),
2513+
&digest,
2514+
)
2515+
.await
2516+
.expect("failed to blob stream push");
2517+
2518+
assert_eq!(layer_location, format!("http://localhost:{}/v2/hello-wasm/blobs/sha256:6165c4ad43c0803798b6f2e49d6348c915d52c999a5f890846cee77ea65d230b", port));
2519+
}
2520+
24602521
#[tokio::test]
24612522
#[cfg(feature = "test-registry")]
24622523
async fn can_push_multiple_chunks() {

0 commit comments

Comments
 (0)