Skip to content

Add pushChunks #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
336 changes: 336 additions & 0 deletions src/main/java/land/oras/Registry.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

package land.oras;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -51,6 +55,21 @@
@NullMarked
public final class Registry extends OCI<ContainerRef> {

/**
* The chunk size for uploading blobs (5MB)
* This is a standard chunk size commonly used in cloud storage systems to balance
* network performance with memory usage. The actual chunk size used may be larger
* if the registry specifies a minimum chunk size via the OCI-Chunk-Min-Length header.
*/
private static final int CHUNK_SIZE = 5 * 1024 * 1024;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 5? It it from the spec? If yes would be nice to link it on the javadoc. Or explain this value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as digest calculation limit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this must be honnored

If the registry has a minimum chunk size, the POST response SHOULD include the following header, where <size> is the size in bytes (see the blob PATCH definition for usage)

https://github.com/opencontainers/distribution-spec/blob/main/spec.md

Also I think be default we should use the monolitic 2 step push unless the file is large (to also limit the number of HTTP request if not needed)


/**
* The digest calculation limit (16MB)
* For files smaller than this size, we compute the digest before starting the upload
* to check if the blob already exists in the registry, potentially avoiding unnecessary uploads.
*/
private static final int DIGEST_CALCULATION_LIMIT = 16 * 1024 * 1024;

/**
* The HTTP client
*/
Expand Down Expand Up @@ -680,6 +699,216 @@ private void logResponse(OrasHttpClient.ResponseWrapper<?> response) {
}
}

/**
* Push a blob using input stream in chunks to avoid loading the whole blob in memory.
* This method is recommended for large files to prevent excessive memory usage.
* For smaller blobs, consider using {@link #pushBlob(ContainerRef, Path)} which may be more efficient
* as it uses fewer HTTP requests.
*
* This method complies with the OCI Distribution Specification for chunked uploads and will
* respect the minimum chunk size requirements specified by the registry.
*
* @param containerRef the container ref
* @param input the input stream
* @param size the size of the blob
* @return The Layer containing the uploaded blob information
* @throws OrasException if upload fails or digest calculation fails
* @see <a href="https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pushing-a-blob-in-chunks">OCI Distribution Spec: Pushing a blob in chunks</a>
*/
public Layer pushChunks(ContainerRef containerRef, InputStream input, long size) {
// INITIALIZATION PHASE

// Initialize the Message Digest
MessageDigest digest;
try {
digest = MessageDigest.getInstance(containerRef.getAlgorithm().getAlgorithmName());
} catch (NoSuchAlgorithmException e) {
throw new OrasException("Failed to get message digest", e);
}

byte[] buffer = new byte[CHUNK_SIZE];
ByteArrayOutputStream firstChunk = new ByteArrayOutputStream();
int bytesRead;
long totalBytesRead = 0;
String contentDigest = null;

try {
// FIRST CHUNK PROCESSING

// Read first chunk to buffer for initial PATCH request
while ((bytesRead = input.read(buffer)) != -1 && totalBytesRead < CHUNK_SIZE) {
digest.update(buffer, 0, bytesRead);
firstChunk.write(buffer, 0, bytesRead);
totalBytesRead += bytesRead;
}

// Handle small blobs that fit in one chunk
if (bytesRead == -1) {
contentDigest = createDigestString(containerRef, digest.digest());

// Check if blob already exists, return early if it does
if (hasBlob(containerRef.withDigest(contentDigest))) {
LOG.info("Blob already exists: {}", contentDigest);
return Layer.fromDigest(contentDigest, totalBytesRead);
}
}

// UPLOAD SESSION INITIALIZATION

URI uploadUri = URI.create("%s://%s".formatted(getScheme(), containerRef.getBlobsUploadPath()));
OrasHttpClient.ResponseWrapper<String> response = client.post(uploadUri, new byte[0], Map.of());

// Handle authentication if needed
if (switchTokenAuth(containerRef, response)) {
response = client.post(uploadUri, new byte[0], Map.of());
}

handleError(response);
if (response.statusCode() != 202) {
throw new OrasException("Failed to initiate blob upload: " + response.statusCode());
}

// Get upload location URL
String location = response.headers().get(Const.LOCATION_HEADER.toLowerCase());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure to understand this part. Why do we need to update the location?

Copy link
Contributor Author

@vaidikcode vaidikcode Mar 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the spec section on "Pushing a blob in chunks":

Each successful chunk upload MUST have a 202 Accepted response code, and MUST have the following headers:

Location: <location>

Range: 0-<end-of-range>

Each consecutive chunk upload SHOULD use the provided in the response to the previous chunk upload.

The code is correctly implementing this part of the specification. After each chunk upload (PATCH request), the registry may update the location header to point to a different URL for the next chunk

if (location == null) {
throw new OrasException("No location header in response");
}

// Handle minimum chunk size requirements from registry
int chunkSize = adjustChunkSizeIfNeeded(response, buffer.length);
if (buffer.length < chunkSize) {
buffer = new byte[chunkSize];
}

// Ensure location is an absolute URL
location = ensureAbsoluteUri(location, containerRef);
LOG.debug("Initial location URL: {}", location);

// UPLOAD FIRST CHUNK

long startRange = 0;
long endRange = totalBytesRead - 1;

if (totalBytesRead > 0) {
// Prepare headers for first chunk
Map<String, String> firstChunkHeaders = new HashMap<>();
firstChunkHeaders.put(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE);
firstChunkHeaders.put(Const.CONTENT_RANGE_HEADER, startRange + "-" + endRange);

// Upload first chunk
response = client.patch(URI.create(location), firstChunk.toByteArray(), firstChunkHeaders);
handleError(response);

if (response.statusCode() != 202) {
throw new OrasException("Failed to upload first chunk: " + response.statusCode());
}

// Update location for next request
location = getLocationHeader(response);
location = ensureAbsoluteUri(location, containerRef);
LOG.debug("Location after first chunk: {}", location);

// Update range information for next chunk
endRange = getEndRangeFromHeader(response, endRange);
startRange = endRange + 1;

// PROCESS TRANSITION BYTES

// Handle bytes read during the last iteration of first chunk loop
if (bytesRead > 0) {
LOG.debug("Processing transition bytes: {} bytes", bytesRead);
digest.update(buffer, 0, bytesRead);

// Prepare headers for transition bytes
Map<String, String> transitionHeaders = new HashMap<>();
long transitionEndRange = startRange + bytesRead - 1;
transitionHeaders.put(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE);
transitionHeaders.put(Const.CONTENT_RANGE_HEADER, startRange + "-" + transitionEndRange);

// Upload transition bytes
response = client.patch(URI.create(location), Arrays.copyOf(buffer, bytesRead), transitionHeaders);
handleError(response);

if (response.statusCode() != 202) {
throw new OrasException("Failed to upload transition bytes: " + response.statusCode());
}

// Update location for next chunk
location = getLocationHeader(response);
location = ensureAbsoluteUri(location, containerRef);
LOG.debug("Location after transition chunk: {}", location);

// Update range information for next chunk
endRange = getEndRangeFromHeader(response, transitionEndRange);
startRange = endRange + 1;
totalBytesRead += bytesRead;
}
}

// UPLOAD REMAINING CHUNKS

while ((bytesRead = input.read(buffer)) != -1) {
// Update digest with current chunk
digest.update(buffer, 0, bytesRead);

// Prepare headers for chunk
Map<String, String> chunkHeaders = new HashMap<>();
endRange = startRange + bytesRead - 1;
chunkHeaders.put(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE);
chunkHeaders.put(Const.CONTENT_RANGE_HEADER, startRange + "-" + endRange);

// Upload chunk
response = client.patch(URI.create(location), Arrays.copyOf(buffer, bytesRead), chunkHeaders);
handleError(response);

if (response.statusCode() != 202) {
throw new OrasException("Failed to upload chunk: " + response.statusCode());
}

// Update location for next chunk
location = getLocationHeader(response);
location = ensureAbsoluteUri(location, containerRef);

// Update range information for next chunk
endRange = getEndRangeFromHeader(response, endRange);
startRange = endRange + 1;
totalBytesRead += bytesRead;
}

// FINALIZE UPLOAD

// Calculate final digest if not already done
if (contentDigest == null) {
contentDigest = createDigestString(containerRef, digest.digest());
LOG.debug("Calculated content digest: {}", contentDigest);
}

// Prepare final upload URI
URI finalizeUri = constructFinalizeUri(location, contentDigest, containerRef);

// Complete the upload with final PUT
Map<String, String> finalHeaders = new HashMap<>();
finalHeaders.put(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE);

// Log finalization details for debugging
LOG.debug("Final PUT URL: {}", finalizeUri);
LOG.debug("Content Digest: {}", contentDigest);

response = client.put(finalizeUri, new byte[0], finalHeaders);
logFinalResponse(response);
handleError(response);

if (response.statusCode() != 201) {
throw new OrasException("Failed to complete blob upload: " + response.statusCode());
}

return Layer.fromDigest(contentDigest, totalBytesRead);

} catch (IOException e) {
throw new OrasException("Failed to push blob", e);
}
}

/**
* Get blob as stream to avoid loading into memory
* @param containerRef The container ref
Expand All @@ -690,6 +919,113 @@ public InputStream getBlobStream(ContainerRef containerRef) {
return fetchBlob(containerRef);
}

// Helper method to convert bytes to hex
private static String bytesToHex(byte[] bytes) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty such this already exist. If not must be moved to DigestUtil

Copy link
Contributor Author

@vaidikcode vaidikcode Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't simply move bytesToHex to DigestUtils and use it because the DigestUtils class appears to be designed as a proper utility class with static methods, yet its package-private access limits its usability.

What was the intended scope for DigestUtils? If it is meant to be a shared utility, making it public would be a cleaner solution than duplicating the functionality in the Registry. What do you think? Is there any other way to achieve the same behavior as bytesToHex in the createDigestString method? @jonesbusy

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to check SupportedAlgorithm class. I think your answer is there.

Since this PR start to be quite large I think it's worth to split it in several smaller PR that can support this one.

It would make it easier to review and faster to merge smaller part of code

StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) hexString.append('0');
hexString.append(hex);
}
return hexString.toString();
}

/**
* Creates a properly formatted digest string.
*/
private String createDigestString(ContainerRef containerRef, byte[] digestBytes) {
return containerRef.getAlgorithm().getPrefix() + ":" + bytesToHex(digestBytes);
}

/**
* Gets and validates the location header.
*/
private String getLocationHeader(OrasHttpClient.ResponseWrapper<String> response) {
String location = response.headers().get(Const.LOCATION_HEADER.toLowerCase());
if (location == null) {
throw new OrasException("No location header in response");
}
return location;
}

/**
* Makes sure the location URI has a scheme.
*/
private String ensureAbsoluteUri(String location, ContainerRef containerRef) {
if (!location.startsWith("http:") && !location.startsWith("https:")) {
return "%s://%s/%s".formatted(getScheme(), containerRef.getRegistry(), location.replaceFirst("^/", ""));
}
return location;
}

/**
* Extracts the end range value from response headers.
*/
private long getEndRangeFromHeader(OrasHttpClient.ResponseWrapper<String> response, long defaultEndRange) {
String rangeHeader = response.headers().get(Const.RANGE_HEADER.toLowerCase());
if (rangeHeader != null) {
String[] parts = rangeHeader.split("-");
if (parts.length == 2) {
return Long.parseLong(parts[1]);
}
}
return defaultEndRange;
}

/**
* Adjusts chunk size based on registry requirements.
*/
private int adjustChunkSizeIfNeeded(OrasHttpClient.ResponseWrapper<String> response, int currentChunkSize) {
String minChunkSizeHeader = response.headers().get("OCI-Chunk-Min-Length".toLowerCase());
if (minChunkSizeHeader == null) {
return currentChunkSize;
}

try {
int registryMinChunkSize = Integer.parseInt(minChunkSizeHeader);
if (registryMinChunkSize > currentChunkSize) {
LOG.debug(
"Registry requires minimum chunk size of {} bytes, adjusting from default {} bytes",
registryMinChunkSize,
currentChunkSize);
return registryMinChunkSize;
}
} catch (NumberFormatException e) {
LOG.warn("Invalid OCI-Chunk-Min-Length header value: {}", minChunkSizeHeader);
}

return currentChunkSize;
}

/**
* Constructs the URI for finalizing the upload.
*/
private URI constructFinalizeUri(String location, String contentDigest, ContainerRef containerRef) {
location = ensureAbsoluteUri(location, containerRef);
LOG.debug("Final location before finalize: {}", location);

try {
if (location.contains("?")) {
return URI.create(location + "&digest=" + contentDigest);
} else {
return URI.create(location + "?digest=" + contentDigest);
}
} catch (Exception e) {
throw new OrasException("Failed to construct URI for completing upload", e);
}
}

/**
* Logs final response details.
*/
private void logFinalResponse(OrasHttpClient.ResponseWrapper<String> response) {
LOG.debug("Response status: {}", response.statusCode());
LOG.debug("Response headers: {}", response.headers());
if (response.response() instanceof String) {
LOG.debug("Response body: {}", response.response());
}
}

/**
* Return if a media type is an index media type
* @param mediaType The media type
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/land/oras/utils/Const.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,19 @@ public static String currentTimestamp() {
* Application octet stream header value
*/
public static final String APPLICATION_OCTET_STREAM_HEADER_VALUE = "application/octet-stream";

/**
* Content Range header
*/
public static final String CONTENT_RANGE_HEADER = "Content-Range";

/**
* Range header
*/
public static final String RANGE_HEADER = "Range";

/**
* OCI Chunk Minimum Length header
*/
public static final String OCI_CHUNK_MIN_LENGTH_HEADER = "OCI-Chunk-Min-Length";
}
Loading