Skip to content

Commit e3e9287

Browse files
authored
Merge pull request #2503 from mtrmac/chunked-bic2-partial
zstd:chunked refactoring for early review
2 parents 7847869 + babdac8 commit e3e9287

File tree

10 files changed

+382
-199
lines changed

10 files changed

+382
-199
lines changed

copy/compression.go

Lines changed: 59 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ var (
3535

3636
// bpDetectCompressionStepData contains data that the copy pipeline needs about the “detect compression” step.
3737
type bpDetectCompressionStepData struct {
38-
isCompressed bool
39-
format compressiontypes.Algorithm // Valid if isCompressed
40-
decompressor compressiontypes.DecompressorFunc // Valid if isCompressed
41-
srcCompressorName string // Compressor name to possibly record in the blob info cache for the source blob.
38+
isCompressed bool
39+
format compressiontypes.Algorithm // Valid if isCompressed
40+
decompressor compressiontypes.DecompressorFunc // Valid if isCompressed
41+
srcCompressorBaseVariantName string // Compressor name to possibly record in the blob info cache for the source blob.
4242
}
4343

4444
// blobPipelineDetectCompressionStep updates *stream to detect its current compression format.
@@ -58,9 +58,9 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI
5858
decompressor: decompressor,
5959
}
6060
if res.isCompressed {
61-
res.srcCompressorName = format.Name()
61+
res.srcCompressorBaseVariantName = format.BaseVariantName()
6262
} else {
63-
res.srcCompressorName = internalblobinfocache.Uncompressed
63+
res.srcCompressorBaseVariantName = internalblobinfocache.Uncompressed
6464
}
6565

6666
if expectedBaseFormat, known := expectedBaseCompressionFormats[stream.info.MediaType]; known && res.isCompressed && format.BaseVariantName() != expectedBaseFormat.Name() {
@@ -71,13 +71,13 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI
7171

7272
// bpCompressionStepData contains data that the copy pipeline needs about the compression step.
7373
type bpCompressionStepData struct {
74-
operation bpcOperation // What we are actually doing
75-
uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do)
76-
uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits.
77-
uploadedAnnotations map[string]string // Compression-related annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed.
78-
srcCompressorName string // Compressor name to record in the blob info cache for the source blob.
79-
uploadedCompressorName string // Compressor name to record in the blob info cache for the uploaded blob.
80-
closers []io.Closer // Objects to close after the upload is done, if any.
74+
operation bpcOperation // What we are actually doing
75+
uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do)
76+
uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits.
77+
uploadedAnnotations map[string]string // Compression-related annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed.
78+
srcCompressorBaseVariantName string // Compressor base variant name to record in the blob info cache for the source blob.
79+
uploadedCompressorName string // Compressor name to record in the blob info cache for the uploaded blob.
80+
closers []io.Closer // Objects to close after the upload is done, if any.
8181
}
8282

8383
type bpcOperation int
@@ -129,11 +129,11 @@ func (ic *imageCopier) bpcPreserveEncrypted(stream *sourceStream, _ bpDetectComp
129129
// We can’t do anything with an encrypted blob unless decrypted.
130130
logrus.Debugf("Using original blob without modification for encrypted blob")
131131
return &bpCompressionStepData{
132-
operation: bpcOpPreserveOpaque,
133-
uploadedOperation: types.PreserveOriginal,
134-
uploadedAlgorithm: nil,
135-
srcCompressorName: internalblobinfocache.UnknownCompression,
136-
uploadedCompressorName: internalblobinfocache.UnknownCompression,
132+
operation: bpcOpPreserveOpaque,
133+
uploadedOperation: types.PreserveOriginal,
134+
uploadedAlgorithm: nil,
135+
srcCompressorBaseVariantName: internalblobinfocache.UnknownCompression,
136+
uploadedCompressorName: internalblobinfocache.UnknownCompression,
137137
}, nil
138138
}
139139
return nil, nil
@@ -158,13 +158,13 @@ func (ic *imageCopier) bpcCompressUncompressed(stream *sourceStream, detected bp
158158
Size: -1,
159159
}
160160
return &bpCompressionStepData{
161-
operation: bpcOpCompressUncompressed,
162-
uploadedOperation: types.Compress,
163-
uploadedAlgorithm: uploadedAlgorithm,
164-
uploadedAnnotations: annotations,
165-
srcCompressorName: detected.srcCompressorName,
166-
uploadedCompressorName: uploadedAlgorithm.Name(),
167-
closers: []io.Closer{reader},
161+
operation: bpcOpCompressUncompressed,
162+
uploadedOperation: types.Compress,
163+
uploadedAlgorithm: uploadedAlgorithm,
164+
uploadedAnnotations: annotations,
165+
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
166+
uploadedCompressorName: uploadedAlgorithm.Name(),
167+
closers: []io.Closer{reader},
168168
}, nil
169169
}
170170
return nil, nil
@@ -199,13 +199,13 @@ func (ic *imageCopier) bpcRecompressCompressed(stream *sourceStream, detected bp
199199
}
200200
succeeded = true
201201
return &bpCompressionStepData{
202-
operation: bpcOpRecompressCompressed,
203-
uploadedOperation: types.PreserveOriginal,
204-
uploadedAlgorithm: ic.compressionFormat,
205-
uploadedAnnotations: annotations,
206-
srcCompressorName: detected.srcCompressorName,
207-
uploadedCompressorName: ic.compressionFormat.Name(),
208-
closers: []io.Closer{decompressed, recompressed},
202+
operation: bpcOpRecompressCompressed,
203+
uploadedOperation: types.PreserveOriginal,
204+
uploadedAlgorithm: ic.compressionFormat,
205+
uploadedAnnotations: annotations,
206+
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
207+
uploadedCompressorName: ic.compressionFormat.Name(),
208+
closers: []io.Closer{decompressed, recompressed},
209209
}, nil
210210
}
211211
return nil, nil
@@ -226,12 +226,12 @@ func (ic *imageCopier) bpcDecompressCompressed(stream *sourceStream, detected bp
226226
Size: -1,
227227
}
228228
return &bpCompressionStepData{
229-
operation: bpcOpDecompressCompressed,
230-
uploadedOperation: types.Decompress,
231-
uploadedAlgorithm: nil,
232-
srcCompressorName: detected.srcCompressorName,
233-
uploadedCompressorName: internalblobinfocache.Uncompressed,
234-
closers: []io.Closer{s},
229+
operation: bpcOpDecompressCompressed,
230+
uploadedOperation: types.Decompress,
231+
uploadedAlgorithm: nil,
232+
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
233+
uploadedCompressorName: internalblobinfocache.Uncompressed,
234+
closers: []io.Closer{s},
235235
}, nil
236236
}
237237
return nil, nil
@@ -269,11 +269,14 @@ func (ic *imageCopier) bpcPreserveOriginal(_ *sourceStream, detected bpDetectCom
269269
algorithm = nil
270270
}
271271
return &bpCompressionStepData{
272-
operation: bpcOp,
273-
uploadedOperation: uploadedOp,
274-
uploadedAlgorithm: algorithm,
275-
srcCompressorName: detected.srcCompressorName,
276-
uploadedCompressorName: detected.srcCompressorName,
272+
operation: bpcOp,
273+
uploadedOperation: uploadedOp,
274+
uploadedAlgorithm: algorithm,
275+
srcCompressorBaseVariantName: detected.srcCompressorBaseVariantName,
276+
// We only record the base variant of the format on upload; we didn’t do anything with
277+
// the TOC, we don’t know whether it matches the blob digest, so we don’t want to trigger
278+
// reuse of any kind between the blob digest and the TOC digest.
279+
uploadedCompressorName: detected.srcCompressorBaseVariantName,
277280
}
278281
}
279282

@@ -333,9 +336,9 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf
333336
return fmt.Errorf("Internal error: Unexpected d.operation value %#v", d.operation)
334337
}
335338
}
336-
if d.srcCompressorName == "" || d.uploadedCompressorName == "" {
337-
return fmt.Errorf("internal error: missing compressor names (src: %q, uploaded: %q)",
338-
d.srcCompressorName, d.uploadedCompressorName)
339+
if d.srcCompressorBaseVariantName == "" || d.uploadedCompressorName == "" {
340+
return fmt.Errorf("internal error: missing compressor names (src base: %q, uploaded: %q)",
341+
d.srcCompressorBaseVariantName, d.uploadedCompressorName)
339342
}
340343
if d.uploadedCompressorName != internalblobinfocache.UnknownCompression {
341344
if d.uploadedCompressorName != compressiontypes.ZstdChunkedAlgorithmName {
@@ -347,15 +350,19 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf
347350
// between zstd and zstd:chunked; so we could, in varying situations over time, call RecordDigestCompressorName
348351
// with the same digest and both ZstdAlgorithmName and ZstdChunkedAlgorithmName , which causes warnings about
349352
// inconsistent data to be logged.
350-
c.blobInfoCache.RecordDigestCompressorName(uploadedInfo.Digest, d.uploadedCompressorName)
353+
c.blobInfoCache.RecordDigestCompressorData(uploadedInfo.Digest, internalblobinfocache.DigestCompressorData{
354+
BaseVariantCompressor: d.uploadedCompressorName,
355+
})
351356
}
352357
}
353358
if srcInfo.Digest != "" && srcInfo.Digest != uploadedInfo.Digest &&
354-
d.srcCompressorName != internalblobinfocache.UnknownCompression {
355-
if d.srcCompressorName != compressiontypes.ZstdChunkedAlgorithmName {
356-
// HACK: Don’t record zstd:chunked algorithms, see above.
357-
c.blobInfoCache.RecordDigestCompressorName(srcInfo.Digest, d.srcCompressorName)
358-
}
359+
d.srcCompressorBaseVariantName != internalblobinfocache.UnknownCompression {
360+
// If the source is already using some TOC-dependent variant, we either copied the
361+
// blob as is, or perhaps decompressed it; either way we don’t trust the TOC digest,
362+
// so record neither the variant name, nor the TOC digest.
363+
c.blobInfoCache.RecordDigestCompressorData(srcInfo.Digest, internalblobinfocache.DigestCompressorData{
364+
BaseVariantCompressor: d.srcCompressorBaseVariantName,
365+
})
359366
}
360367
return nil
361368
}

internal/blobinfocache/blobinfocache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (bic *v1OnlyBlobInfoCache) UncompressedDigestForTOC(tocDigest digest.Digest
3434
func (bic *v1OnlyBlobInfoCache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
3535
}
3636

37-
func (bic *v1OnlyBlobInfoCache) RecordDigestCompressorName(anyDigest digest.Digest, compressorName string) {
37+
func (bic *v1OnlyBlobInfoCache) RecordDigestCompressorData(anyDigest digest.Digest, data DigestCompressorData) {
3838
}
3939

4040
func (bic *v1OnlyBlobInfoCache) CandidateLocations2(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, options CandidateLocations2Options) []BICReplacementCandidate2 {

internal/blobinfocache/types.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,25 @@ type BlobInfoCache2 interface {
3535
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
3636
RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest)
3737

38-
// RecordDigestCompressorName records a compressor for the blob with the specified digest,
39-
// or Uncompressed or UnknownCompression.
40-
// WARNING: Only call this with LOCALLY VERIFIED data; don’t record a compressor for a
41-
// digest just because some remote author claims so (e.g. because a manifest says so);
38+
// RecordDigestCompressorData records data for the blob with the specified digest.
39+
// WARNING: Only call this with LOCALLY VERIFIED data:
40+
// - don’t record a compressor for a digest just because some remote author claims so
41+
// (e.g. because a manifest says so);
4242
// otherwise the cache could be poisoned and cause us to make incorrect edits to type
4343
// information in a manifest.
44-
RecordDigestCompressorName(anyDigest digest.Digest, compressorName string)
44+
RecordDigestCompressorData(anyDigest digest.Digest, data DigestCompressorData)
4545
// CandidateLocations2 returns a prioritized, limited, number of blobs and their locations (if known)
4646
// that could possibly be reused within the specified (transport scope) (if they still
4747
// exist, which is not guaranteed).
4848
CandidateLocations2(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, options CandidateLocations2Options) []BICReplacementCandidate2
4949
}
5050

51+
// DigestCompressorData is information known about how a blob is compressed.
52+
// (This is worded generically, but basically targeted at the zstd / zstd:chunked situation.)
53+
type DigestCompressorData struct {
54+
BaseVariantCompressor string // A compressor’s base variant name, or Uncompressed or UnknownCompression.
55+
}
56+
5157
// CandidateLocations2Options are used in CandidateLocations2.
5258
type CandidateLocations2Options struct {
5359
// If !CanSubstitute, the returned candidates will match the submitted digest exactly; if

pkg/blobinfocache/boltdb/boltdb.go

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -295,27 +295,29 @@ func (bdc *cache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompresse
295295
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
296296
}
297297

298-
// RecordDigestCompressorName records that the blob with digest anyDigest was compressed with the specified
299-
// compressor, or is blobinfocache.Uncompressed.
300-
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
301-
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
302-
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
303-
func (bdc *cache) RecordDigestCompressorName(anyDigest digest.Digest, compressorName string) {
298+
// RecordDigestCompressorData records data for the blob with the specified digest.
299+
// WARNING: Only call this with LOCALLY VERIFIED data:
300+
// - don’t record a compressor for a digest just because some remote author claims so
301+
// (e.g. because a manifest says so);
302+
//
303+
// otherwise the cache could be poisoned and cause us to make incorrect edits to type
304+
// information in a manifest.
305+
func (bdc *cache) RecordDigestCompressorData(anyDigest digest.Digest, data blobinfocache.DigestCompressorData) {
304306
_ = bdc.update(func(tx *bolt.Tx) error {
305307
b, err := tx.CreateBucketIfNotExists(digestCompressorBucket)
306308
if err != nil {
307309
return err
308310
}
309311
key := []byte(anyDigest.String())
310312
if previousBytes := b.Get(key); previousBytes != nil {
311-
if string(previousBytes) != compressorName {
312-
logrus.Warnf("Compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, string(previousBytes), compressorName)
313+
if string(previousBytes) != data.BaseVariantCompressor {
314+
logrus.Warnf("Compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, string(previousBytes), data.BaseVariantCompressor)
313315
}
314316
}
315-
if compressorName == blobinfocache.UnknownCompression {
317+
if data.BaseVariantCompressor == blobinfocache.UnknownCompression {
316318
return b.Delete(key)
317319
}
318-
return b.Put(key, []byte(compressorName))
320+
return b.Put(key, []byte(data.BaseVariantCompressor))
319321
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
320322
}
321323

@@ -367,8 +369,10 @@ func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateW
367369
compressorName = string(compressorNameValue)
368370
}
369371
}
370-
ok, compressionOp, compressionAlgo := prioritize.CandidateCompression(v2Options, digest, compressorName)
371-
if !ok {
372+
template := prioritize.CandidateTemplateWithCompression(v2Options, digest, blobinfocache.DigestCompressorData{
373+
BaseVariantCompressor: compressorName,
374+
})
375+
if template == nil {
372376
return candidates
373377
}
374378

@@ -382,28 +386,11 @@ func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateW
382386
if err := t.UnmarshalBinary(v); err != nil {
383387
return err
384388
}
385-
candidates = append(candidates, prioritize.CandidateWithTime{
386-
Candidate: blobinfocache.BICReplacementCandidate2{
387-
Digest: digest,
388-
CompressionOperation: compressionOp,
389-
CompressionAlgorithm: compressionAlgo,
390-
Location: types.BICLocationReference{Opaque: string(k)},
391-
},
392-
LastSeen: t,
393-
})
389+
candidates = append(candidates, template.CandidateWithLocation(types.BICLocationReference{Opaque: string(k)}, t))
394390
return nil
395391
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
396392
} else if v2Options != nil {
397-
candidates = append(candidates, prioritize.CandidateWithTime{
398-
Candidate: blobinfocache.BICReplacementCandidate2{
399-
Digest: digest,
400-
CompressionOperation: compressionOp,
401-
CompressionAlgorithm: compressionAlgo,
402-
UnknownLocation: true,
403-
Location: types.BICLocationReference{Opaque: ""},
404-
},
405-
LastSeen: time.Time{},
406-
})
393+
candidates = append(candidates, template.CandidateWithUnknownLocation())
407394
}
408395
return candidates
409396
}

0 commit comments

Comments
 (0)