Skip to content

Commit 8c7c58c

Browse files
authored
Merge pull request #2487 from mtrmac/chunked-bic2
Record zstd:chunked format, and annotations, in BlobInfoCache
2 parents 55a2d1e + 3d38dae commit 8c7c58c

File tree

15 files changed

+551
-209
lines changed

15 files changed

+551
-209
lines changed

copy/compression.go

+69-51
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,16 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI
5252
}
5353
stream.reader = reader
5454

55+
if decompressor != nil && format.Name() == compressiontypes.ZstdAlgorithmName {
56+
tocDigest, err := chunkedToc.GetTOCDigest(srcInfo.Annotations)
57+
if err != nil {
58+
return bpDetectCompressionStepData{}, err
59+
}
60+
if tocDigest != nil {
61+
format = compression.ZstdChunked
62+
}
63+
64+
}
5565
res := bpDetectCompressionStepData{
5666
isCompressed: decompressor != nil,
5767
format: format,
@@ -71,13 +81,14 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI
7181

7282
// bpCompressionStepData contains data that the copy pipeline needs about the compression step.
7383
type bpCompressionStepData struct {
74-
operation bpcOperation // What we are actually doing
75-
uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do)
76-
uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits.
77-
uploadedAnnotations map[string]string // Compression-related annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed.
78-
srcCompressorBaseVariantName string // Compressor base variant name to record in the blob info cache for the source blob.
79-
uploadedCompressorName string // Compressor name to record in the blob info cache for the uploaded blob.
80-
closers []io.Closer // Objects to close after the upload is done, if any.
84+
operation bpcOperation // What we are actually doing
85+
uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do)
86+
uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits.
87+
uploadedAnnotations map[string]string // Compression-related annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed.
88+
srcCompressorBaseVariantName string // Compressor base variant name to record in the blob info cache for the source blob.
89+
uploadedCompressorBaseVariantName string // Compressor base variant name to record in the blob info cache for the uploaded blob.
90+
uploadedCompressorSpecificVariantName string // Compressor specific variant name to record in the blob info cache for the uploaded blob.
91+
closers []io.Closer // Objects to close after the upload is done, if any.
8192
}
8293

8394
type bpcOperation int
@@ -129,11 +140,12 @@ func (ic *imageCopier) bpcPreserveEncrypted(stream *sourceStream, _ bpDetectComp
129140
// We can’t do anything with an encrypted blob unless decrypted.
130141
logrus.Debugf("Using original blob without modification for encrypted blob")
131142
return &bpCompressionStepData{
132-
operation: bpcOpPreserveOpaque,
133-
uploadedOperation: types.PreserveOriginal,
134-
uploadedAlgorithm: nil,
135-
srcCompressorBaseVariantName: internalblobinfocache.UnknownCompression,
136-
uploadedCompressorName: internalblobinfocache.UnknownCompression,
143+
operation: bpcOpPreserveOpaque,
144+
uploadedOperation: types.PreserveOriginal,
145+
uploadedAlgorithm: nil,
146+
srcCompressorBaseVariantName: internalblobinfocache.UnknownCompression,
147+
uploadedCompressorBaseVariantName: internalblobinfocache.UnknownCompression,
148+
uploadedCompressorSpecificVariantName: internalblobinfocache.UnknownCompression,
137149
}, nil
138150
}
139151
return nil, nil
@@ -157,14 +169,19 @@ func (ic *imageCopier) bpcCompressUncompressed(stream *sourceStream, detected bp
157169
Digest: "",
158170
Size: -1,
159171
}
172+
specificVariantName := uploadedAlgorithm.Name()
173+
if specificVariantName == uploadedAlgorithm.BaseVariantName() {
174+
specificVariantName = internalblobinfocache.UnknownCompression
175+
}
160176
return &bpCompressionStepData{
161-
operation: bpcOpCompressUncompressed,
162-
uploadedOperation: types.Compress,
163-
uploadedAlgorithm: uploadedAlgorithm,
164-
uploadedAnnotations: annotations,
165-
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
166-
uploadedCompressorName: uploadedAlgorithm.Name(),
167-
closers: []io.Closer{reader},
177+
operation: bpcOpCompressUncompressed,
178+
uploadedOperation: types.Compress,
179+
uploadedAlgorithm: uploadedAlgorithm,
180+
uploadedAnnotations: annotations,
181+
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
182+
uploadedCompressorBaseVariantName: uploadedAlgorithm.BaseVariantName(),
183+
uploadedCompressorSpecificVariantName: specificVariantName,
184+
closers: []io.Closer{reader},
168185
}, nil
169186
}
170187
return nil, nil
@@ -197,15 +214,20 @@ func (ic *imageCopier) bpcRecompressCompressed(stream *sourceStream, detected bp
197214
Digest: "",
198215
Size: -1,
199216
}
217+
specificVariantName := ic.compressionFormat.Name()
218+
if specificVariantName == ic.compressionFormat.BaseVariantName() {
219+
specificVariantName = internalblobinfocache.UnknownCompression
220+
}
200221
succeeded = true
201222
return &bpCompressionStepData{
202-
operation: bpcOpRecompressCompressed,
203-
uploadedOperation: types.PreserveOriginal,
204-
uploadedAlgorithm: ic.compressionFormat,
205-
uploadedAnnotations: annotations,
206-
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
207-
uploadedCompressorName: ic.compressionFormat.Name(),
208-
closers: []io.Closer{decompressed, recompressed},
223+
operation: bpcOpRecompressCompressed,
224+
uploadedOperation: types.PreserveOriginal,
225+
uploadedAlgorithm: ic.compressionFormat,
226+
uploadedAnnotations: annotations,
227+
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
228+
uploadedCompressorBaseVariantName: ic.compressionFormat.BaseVariantName(),
229+
uploadedCompressorSpecificVariantName: specificVariantName,
230+
closers: []io.Closer{decompressed, recompressed},
209231
}, nil
210232
}
211233
return nil, nil
@@ -226,12 +248,13 @@ func (ic *imageCopier) bpcDecompressCompressed(stream *sourceStream, detected bp
226248
Size: -1,
227249
}
228250
return &bpCompressionStepData{
229-
operation: bpcOpDecompressCompressed,
230-
uploadedOperation: types.Decompress,
231-
uploadedAlgorithm: nil,
232-
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
233-
uploadedCompressorName: internalblobinfocache.Uncompressed,
234-
closers: []io.Closer{s},
251+
operation: bpcOpDecompressCompressed,
252+
uploadedOperation: types.Decompress,
253+
uploadedAlgorithm: nil,
254+
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
255+
uploadedCompressorBaseVariantName: internalblobinfocache.Uncompressed,
256+
uploadedCompressorSpecificVariantName: internalblobinfocache.UnknownCompression,
257+
closers: []io.Closer{s},
235258
}, nil
236259
}
237260
return nil, nil
@@ -276,7 +299,8 @@ func (ic *imageCopier) bpcPreserveOriginal(_ *sourceStream, detected bpDetectCom
276299
// We only record the base variant of the format on upload; we didn’t do anything with
277300
// the TOC, we don’t know whether it matches the blob digest, so we don’t want to trigger
278301
// reuse of any kind between the blob digest and the TOC digest.
279-
uploadedCompressorName: detected.srcCompressorBaseVariantName,
302+
uploadedCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
303+
uploadedCompressorSpecificVariantName: internalblobinfocache.UnknownCompression,
280304
}
281305
}
282306

@@ -336,32 +360,26 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf
336360
return fmt.Errorf("Internal error: Unexpected d.operation value %#v", d.operation)
337361
}
338362
}
339-
if d.srcCompressorBaseVariantName == "" || d.uploadedCompressorName == "" {
340-
return fmt.Errorf("internal error: missing compressor names (src base: %q, uploaded: %q)",
341-
d.srcCompressorBaseVariantName, d.uploadedCompressorName)
363+
if d.srcCompressorBaseVariantName == "" || d.uploadedCompressorBaseVariantName == "" || d.uploadedCompressorSpecificVariantName == "" {
364+
return fmt.Errorf("internal error: missing compressor names (src base: %q, uploaded base: %q, uploaded specific: %q)",
365+
d.srcCompressorBaseVariantName, d.uploadedCompressorBaseVariantName, d.uploadedCompressorSpecificVariantName)
342366
}
343-
if d.uploadedCompressorName != internalblobinfocache.UnknownCompression {
344-
if d.uploadedCompressorName != compressiontypes.ZstdChunkedAlgorithmName {
345-
// HACK: Don’t record zstd:chunked algorithms.
346-
// There is already a similar hack in internal/imagedestination/impl/helpers.CandidateMatchesTryReusingBlobOptions,
347-
// and that one prevents reusing zstd:chunked blobs, so recording the algorithm here would be mostly harmless.
348-
//
349-
// We skip that here anyway to work around the inability of blobPipelineDetectCompressionStep to differentiate
350-
// between zstd and zstd:chunked; so we could, in varying situations over time, call RecordDigestCompressorName
351-
// with the same digest and both ZstdAlgorithmName and ZstdChunkedAlgorithmName , which causes warnings about
352-
// inconsistent data to be logged.
353-
c.blobInfoCache.RecordDigestCompressorData(uploadedInfo.Digest, internalblobinfocache.DigestCompressorData{
354-
BaseVariantCompressor: d.uploadedCompressorName,
355-
})
356-
}
367+
if d.uploadedCompressorBaseVariantName != internalblobinfocache.UnknownCompression {
368+
c.blobInfoCache.RecordDigestCompressorData(uploadedInfo.Digest, internalblobinfocache.DigestCompressorData{
369+
BaseVariantCompressor: d.uploadedCompressorBaseVariantName,
370+
SpecificVariantCompressor: d.uploadedCompressorSpecificVariantName,
371+
SpecificVariantAnnotations: d.uploadedAnnotations,
372+
})
357373
}
358374
if srcInfo.Digest != "" && srcInfo.Digest != uploadedInfo.Digest &&
359375
d.srcCompressorBaseVariantName != internalblobinfocache.UnknownCompression {
360376
// If the source is already using some TOC-dependent variant, we either copied the
361377
// blob as is, or perhaps decompressed it; either way we don’t trust the TOC digest,
362378
// so record neither the variant name, nor the TOC digest.
363379
c.blobInfoCache.RecordDigestCompressorData(srcInfo.Digest, internalblobinfocache.DigestCompressorData{
364-
BaseVariantCompressor: d.srcCompressorBaseVariantName,
380+
BaseVariantCompressor: d.srcCompressorBaseVariantName,
381+
SpecificVariantCompressor: internalblobinfocache.UnknownCompression,
382+
SpecificVariantAnnotations: nil,
365383
})
366384
}
367385
return nil

copy/single.go

+22-9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"io"
9+
"maps"
910
"reflect"
1011
"slices"
1112
"strings"
@@ -162,7 +163,7 @@ func (c *copier) copySingleImage(ctx context.Context, unparsedImage *image.Unpar
162163
if format == nil {
163164
format = defaultCompressionFormat
164165
}
165-
if format.Name() == compression.ZstdChunked.Name() {
166+
if format.Name() == compressiontypes.ZstdChunkedAlgorithmName {
166167
if ic.requireCompressionFormatMatch {
167168
return copySingleImageResult{}, errors.New("explicitly requested to combine zstd:chunked with encryption, which is not beneficial; use plain zstd instead")
168169
}
@@ -888,21 +889,33 @@ func updatedBlobInfoFromReuse(inputInfo types.BlobInfo, reusedBlob private.Reuse
888889
// Handling of compression, encryption, and the related MIME types and the like are all the responsibility
889890
// of the generic code in this package.
890891
res := types.BlobInfo{
891-
Digest: reusedBlob.Digest,
892-
Size: reusedBlob.Size,
893-
URLs: nil, // This _must_ be cleared if Digest changes; clear it in other cases as well, to preserve previous behavior.
894-
Annotations: inputInfo.Annotations, // FIXME: This should remove zstd:chunked annotations (but those annotations being left with incorrect values should not break pulls)
895-
MediaType: inputInfo.MediaType, // Mostly irrelevant, MediaType is updated based on Compression*/CryptoOperation.
892+
Digest: reusedBlob.Digest,
893+
Size: reusedBlob.Size,
894+
URLs: nil, // This _must_ be cleared if Digest changes; clear it in other cases as well, to preserve previous behavior.
895+
// FIXME: This should remove zstd:chunked annotations IF the original was chunked and the new one isn’t
896+
// (but those annotations being left with incorrect values should not break pulls).
897+
Annotations: maps.Clone(inputInfo.Annotations),
898+
MediaType: inputInfo.MediaType, // Mostly irrelevant, MediaType is updated based on Compression*/CryptoOperation.
896899
CompressionOperation: reusedBlob.CompressionOperation,
897900
CompressionAlgorithm: reusedBlob.CompressionAlgorithm,
898901
CryptoOperation: inputInfo.CryptoOperation, // Expected to be unset anyway.
899902
}
900903
// The transport is only expected to fill CompressionOperation and CompressionAlgorithm
901-
// if the blob was substituted; otherwise, fill it in based
904+
// if the blob was substituted; otherwise, it is optional, and if not set, fill it in based
902905
// on what we know from the srcInfos we were given.
903906
if reusedBlob.Digest == inputInfo.Digest {
904-
res.CompressionOperation = inputInfo.CompressionOperation
905-
res.CompressionAlgorithm = inputInfo.CompressionAlgorithm
907+
if res.CompressionOperation == types.PreserveOriginal {
908+
res.CompressionOperation = inputInfo.CompressionOperation
909+
}
910+
if res.CompressionAlgorithm == nil {
911+
res.CompressionAlgorithm = inputInfo.CompressionAlgorithm
912+
}
913+
}
914+
if len(reusedBlob.CompressionAnnotations) != 0 {
915+
if res.Annotations == nil {
916+
res.Annotations = map[string]string{}
917+
}
918+
maps.Copy(res.Annotations, reusedBlob.CompressionAnnotations)
906919
}
907920
return res
908921
}

copy/single_test.go

+25-5
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,42 @@ func TestUpdatedBlobInfoFromReuse(t *testing.T) {
5555
},
5656
{ // Reuse with substitution
5757
reused: private.ReusedBlob{
58-
Digest: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
59-
Size: 513543640,
60-
CompressionOperation: types.Decompress,
61-
CompressionAlgorithm: nil,
58+
Digest: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
59+
Size: 513543640,
60+
CompressionOperation: types.Decompress,
61+
CompressionAlgorithm: nil,
62+
CompressionAnnotations: map[string]string{"decompressed": "value"},
6263
},
6364
expected: types.BlobInfo{
6465
Digest: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
6566
Size: 513543640,
6667
URLs: nil,
67-
Annotations: map[string]string{"test-annotation-2": "two"},
68+
Annotations: map[string]string{"test-annotation-2": "two", "decompressed": "value"},
6869
MediaType: imgspecv1.MediaTypeImageLayerGzip,
6970
CompressionOperation: types.Decompress,
7071
CompressionAlgorithm: nil,
7172
// CryptoOperation is set to the zero value
7273
},
7374
},
75+
{ // Reuse turning zstd into zstd:chunked
76+
reused: private.ReusedBlob{
77+
Digest: "sha256:6a5a5368e0c2d3e5909184fa28ddfd56072e7ff3ee9a945876f7eee5896ef5bb",
78+
Size: 51354364,
79+
CompressionOperation: types.Compress,
80+
CompressionAlgorithm: &compression.ZstdChunked,
81+
CompressionAnnotations: map[string]string{"zstd-toc": "value"},
82+
},
83+
expected: types.BlobInfo{
84+
Digest: "sha256:6a5a5368e0c2d3e5909184fa28ddfd56072e7ff3ee9a945876f7eee5896ef5bb",
85+
Size: 51354364,
86+
URLs: nil,
87+
Annotations: map[string]string{"test-annotation-2": "two", "zstd-toc": "value"},
88+
MediaType: imgspecv1.MediaTypeImageLayerGzip,
89+
CompressionOperation: types.Compress,
90+
CompressionAlgorithm: &compression.ZstdChunked,
91+
// CryptoOperation is set to the zero value
92+
},
93+
},
7494
} {
7595
res := updatedBlobInfoFromReuse(srcInfo, c.reused)
7696
assert.Equal(t, c.expected, res, fmt.Sprintf("%#v", c.reused))

docker/docker_image_dest.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context,
332332
return false, private.ReusedBlob{}, errors.New("Can not check for a blob with unknown digest")
333333
}
334334

335+
originalCandidateKnownToBeMissing := false
335336
if impl.OriginalCandidateMatchesTryReusingBlobOptions(options) {
336337
// First, check whether the blob happens to already exist at the destination.
337338
haveBlob, reusedInfo, err := d.tryReusingExactBlob(ctx, info, options.Cache)
@@ -341,9 +342,17 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context,
341342
if haveBlob {
342343
return true, reusedInfo, nil
343344
}
345+
originalCandidateKnownToBeMissing = true
344346
} else {
345347
logrus.Debugf("Ignoring exact blob match, compression %s does not match required %s or MIME types %#v",
346348
optionalCompressionName(options.OriginalCompression), optionalCompressionName(options.RequiredCompression), options.PossibleManifestFormats)
349+
// We can get here with a blob detected to be zstd when the user wants a zstd:chunked.
350+
// In that case we keep originalCandiateKnownToBeMissing = false, so that if we find
351+
// a BIC entry for this blob, we do use that entry and return a zstd:chunked entry
352+
// with the BIC’s annotations.
353+
// This is not quite correct, it only works if the BIC also contains an acceptable _location_.
354+
// Ideally, we could look up just the compression algorithm/annotations for info.digest,
355+
// and use it even if no location candidate exists and the original dandidate is present.
347356
}
348357

349358
// Then try reusing blobs from other locations.
@@ -387,7 +396,8 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context,
387396
// for it in the current repo.
388397
candidateRepo = reference.TrimNamed(d.ref.ref)
389398
}
390-
if candidateRepo.Name() == d.ref.ref.Name() && candidate.Digest == info.Digest {
399+
if originalCandidateKnownToBeMissing &&
400+
candidateRepo.Name() == d.ref.ref.Name() && candidate.Digest == info.Digest {
391401
logrus.Debug("... Already tried the primary destination")
392402
continue
393403
}
@@ -427,10 +437,12 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context,
427437
options.Cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, newBICLocationReference(d.ref))
428438

429439
return true, private.ReusedBlob{
430-
Digest: candidate.Digest,
431-
Size: size,
432-
CompressionOperation: candidate.CompressionOperation,
433-
CompressionAlgorithm: candidate.CompressionAlgorithm}, nil
440+
Digest: candidate.Digest,
441+
Size: size,
442+
CompressionOperation: candidate.CompressionOperation,
443+
CompressionAlgorithm: candidate.CompressionAlgorithm,
444+
CompressionAnnotations: candidate.CompressionAnnotations,
445+
}, nil
434446
}
435447

436448
return false, private.ReusedBlob{}, nil

0 commit comments

Comments
 (0)