Skip to content

Implement OCI blob chunking #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 118 additions & 21 deletions src/spec-configuration/containerCollectionsOCIPush.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Log, LogLevel } from '../spec-utils/log';
import { isLocalFile } from '../spec-utils/pfs';
import { DEVCONTAINER_COLLECTION_LAYER_MEDIATYPE, DEVCONTAINER_TAR_LAYER_MEDIATYPE, fetchOCIManifestIfExists, OCICollectionRef, OCILayer, OCIManifest, OCIRef, CommonParams } from './containerCollectionsOCI';
import { requestEnsureAuthenticated } from './httpOCIRegistry';
import { maxBodyLength } from '../spec-utils/httpRequest';

interface ManifestContainer {
manifestObj: OCIManifest;
Expand Down Expand Up @@ -69,17 +70,25 @@ export async function pushOCIFeatureOrTemplate(params: CommonParams, ociRef: OCI

// PUT blobs
if (!blobExistsConfigLayer) {
const chunkedUpload = blob.size > maxBodyLength;

// Obtain session ID with `/v2/<namespace>/blobs/uploads/`
const blobPutLocationUriPath = await postUploadSessionId(params, ociRef);
const blobPutLocationUriPath = await postUploadSessionId(params, ociRef, chunkedUpload);
if (!blobPutLocationUriPath) {
output.write(`Failed to get upload session ID`, LogLevel.Error);
return;
}

if (!(await putBlob(params, blobPutLocationUriPath, ociRef, blob))) {
output.write(`Failed to PUT blob '${name}' with digest '${digest}'`, LogLevel.Error);
return;
if (chunkedUpload) {
if (!(await putBlobChunked(params, blobPutLocationUriPath, ociRef, blob))) {
output.write(`Failed to upload chunked blob '${name}' with digest '${digest}'`, LogLevel.Error);
return;
}
} else {
if (!(await putBlobMonolithic(params, blobPutLocationUriPath, ociRef, blob))) {
output.write(`Failed to PUT blob '${name}' with digest '${digest}'`, LogLevel.Error);
return;
}
}
}
}
Expand Down Expand Up @@ -150,7 +159,7 @@ export async function pushCollectionMetadata(params: CommonParams, collectionRef
return;
}

if (!(await putBlob(params, blobPutLocationUriPath, collectionRef, blob))) {
if (!(await putBlobMonolithic(params, blobPutLocationUriPath, collectionRef, blob))) {
output.write(`Failed to PUT blob '${name}' with digest '${digest}'`, LogLevel.Error);
return;
}
Expand Down Expand Up @@ -220,8 +229,26 @@ async function putManifestWithTags(params: CommonParams, manifest: ManifestConta
return contentDigest;
}

// Convert the <location> returned by the initial POST request to an absolute URL.
function determineBlobLocationUrl(blobPutLocationUriPath: string, ociRef: OCIRef | OCICollectionRef): string {
if (blobPutLocationUriPath.startsWith('https://') || blobPutLocationUriPath.startsWith('http://')) {
return blobPutLocationUriPath;
} else {
return `https://${ociRef.registry}${blobPutLocationUriPath}`;
}
}

function appendDigestToUrl(url: string, digest: string): string {
const queryParamsStart = url.indexOf('?');
if (queryParamsStart === -1) {
// Just append digest to the end.
return url + `?digest=${digest}`;
}
return url.substring(0, queryParamsStart) + `?digest=${digest}` + '&' + url.substring(queryParamsStart + 1);
}

// Spec: https://github.com/opencontainers/distribution-spec/blob/main/spec.md#post-then-put (PUT <location>?digest=<digest>)
async function putBlob(params: CommonParams, blobPutLocationUriPath: string, ociRef: OCIRef | OCICollectionRef, blob: { name: string; digest: string; size: number; contents: Buffer }): Promise<boolean> {
async function putBlobMonolithic(params: CommonParams, blobPutLocationUriPath: string, ociRef: OCIRef | OCICollectionRef, blob: { name: string; digest: string; size: number; contents: Buffer }): Promise<boolean> {

const { output } = params;
const { name, digest, size, contents } = blob;
Expand All @@ -234,23 +261,12 @@ async function putBlob(params: CommonParams, blobPutLocationUriPath: string, oci
};

// OCI distribution spec is ambiguous on whether we get back an absolute or relative path.
let url = '';
if (blobPutLocationUriPath.startsWith('https://') || blobPutLocationUriPath.startsWith('http://')) {
url = blobPutLocationUriPath;
} else {
url = `https://${ociRef.registry}${blobPutLocationUriPath}`;
}
let url = determineBlobLocationUrl(blobPutLocationUriPath, ociRef);

// The <location> MAY contain critical query parameters.
// Additionally, it SHOULD match exactly the <location> obtained from the POST request.
// It SHOULD NOT be assembled manually by clients except where absolute/relative conversion is necessary.
const queryParamsStart = url.indexOf('?');
if (queryParamsStart === -1) {
// Just append digest to the end.
url += `?digest=${digest}`;
} else {
url = url.substring(0, queryParamsStart) + `?digest=${digest}` + '&' + url.substring(queryParamsStart + 1);
}
url = appendDigestToUrl(url, digest);

output.write(`PUT blob to -> ${url}`, LogLevel.Trace);

Expand All @@ -271,6 +287,86 @@ async function putBlob(params: CommonParams, blobPutLocationUriPath: string, oci
return true;
}

// Spec: https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pushing-a-blob-in-chunks
async function putBlobChunked(params: CommonParams, blobPutLocationUriPath: string, ociRef: OCIRef | OCICollectionRef, blob: { name: string; digest: string; size: number; contents: Buffer }): Promise<boolean> {
const { output } = params;
const { name, digest, size, contents } = blob;

output.write(`Starting chunked upload of ${name} blob '${digest}' (size=${size})`, LogLevel.Info);

// Each chunk is uploaded to the <location> of the previous request.
// Chunk upload requests MUST have a 202 Accepted response code.
let chunkStart = 0;
let chunkLocationUriPath = blobPutLocationUriPath;

while (chunkStart <= size) {
const chunkEnd = Math.min(chunkStart + maxBodyLength, size);
const chunk = contents.subarray(chunkStart, chunkEnd);

output.write(`Starting PATCH of ${name} blob chunk (size=${chunk.length})`, LogLevel.Info);

const headers = {
'content-type': 'application/octet-stream',
'content-length': `${chunk.length}`,
'content-range': `${chunkEnd}-${chunkStart}`,
};

const url = determineBlobLocationUrl(chunkLocationUriPath, ociRef);

output.write(`PATCH blob chunk to -> ${url}`, LogLevel.Trace);
const res = await requestEnsureAuthenticated(params, { type: 'PATCH', url, headers, data: chunk }, ociRef);
if (!res) {
output.write('Request failed', LogLevel.Error);
return false;
}

const { statusCode, resBody, resHeaders } = res;

if (statusCode === 202) {
chunkLocationUriPath = resHeaders['location'] || resHeaders['Location'];
if (!chunkLocationUriPath) {
output.write(`${url}: Got 202 status code, but no location header found.`, LogLevel.Error);
return false;
}
} else {
// Any other statusCode besides 202 is unexpected
// https://github.com/opencontainers/distribution-spec/blob/main/spec.md#error-codes
const parsed = JSON.parse(resBody?.toString() || '{}');
output.write(`${url}: Unexpected status code '${statusCode}' \n${JSON.stringify(parsed, undefined, 4)}`, LogLevel.Error);
return false;
}

// Start next chunk
chunkStart = chunkEnd;
}

// Terminate session with final PUT request
const url = appendDigestToUrl(determineBlobLocationUrl(chunkLocationUriPath, ociRef), digest);

output.write(`Closing upload session of ${name} blob '${digest}' (size=${size})`, LogLevel.Info);
const res = await requestEnsureAuthenticated(params, { type: 'PUT', url, headers: {} }, ociRef);
if (!res) {
output.write('Request failed', LogLevel.Error);
return false;
}

const { statusCode, resBody, resHeaders } = res;

if (statusCode !== 201) {
const parsed = JSON.parse(resBody?.toString() || '{}');
output.write(`${statusCode}: Failed to upload blob '${digest}' to '${url}' \n${JSON.stringify(parsed, undefined, 4)}`, LogLevel.Error);
return false;
}

const uploadedLocation = resHeaders['location'] || resHeaders['Location'];
if (!uploadedLocation) {
output.write(`${url}: Got 201 status code, but no location header found.`, LogLevel.Error);
return false;
}

return true;
}

// Generate a layer that follows the `application/vnd.devcontainers.layer.v1+tar` mediaType as defined in
// Devcontainer Spec (features) : https://containers.dev/implementors/features-distribution/#oci-registry
// Devcontainer Spec (templates): https://github.com/devcontainers/spec/blob/main/proposals/devcontainer-templates-distribution.md#oci-registry
Expand Down Expand Up @@ -349,12 +445,13 @@ export async function checkIfBlobExists(params: CommonParams, ociRef: OCIRef | O

// Spec: https://github.com/opencontainers/distribution-spec/blob/main/spec.md#post-then-put
// Requires registry auth token.
async function postUploadSessionId(params: CommonParams, ociRef: OCIRef | OCICollectionRef): Promise<string | undefined> {
async function postUploadSessionId(params: CommonParams, ociRef: OCIRef | OCICollectionRef, chunkedUpload?: boolean): Promise<string | undefined> {
const { output } = params;

const url = `https://${ociRef.registry}/v2/${ociRef.path}/blobs/uploads/`;
output.write(`Generating Upload URL -> ${url}`, LogLevel.Trace);
const res = await requestEnsureAuthenticated(params, { type: 'POST', url, headers: {} }, ociRef);
const headers = chunkedUpload ? { 'content-length': '0' } : {};
const res = await requestEnsureAuthenticated(params, { type: 'POST', url, headers }, ociRef);

if (!res) {
output.write('Request failed', LogLevel.Error);
Expand Down
5 changes: 4 additions & 1 deletion src/spec-utils/httpRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,7 @@ export function requestResolveHeaders(options: { type: string; url: string; head
req.on('error', reject);
req.end();
});
}
}

// follow-redirects uses maxBodyLength of 10MB by default
export const maxBodyLength = 10 * 1024 * 1024;