Skip to content

Commit d35eb60

Browse files
Split private mirror functions for reuse
These files will be copied for now and then refactored later into reusable packages.
1 parent 7687cec commit d35eb60

File tree

4 files changed

+312
-243
lines changed

4 files changed

+312
-243
lines changed
+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package mirror
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/docker/distribution"
9+
"github.com/docker/distribution/manifest/manifestlist"
10+
"github.com/docker/distribution/manifest/schema1"
11+
"github.com/docker/distribution/manifest/schema2"
12+
"github.com/docker/distribution/reference"
13+
"github.com/docker/distribution/registry/api/errcode"
14+
"github.com/docker/distribution/registry/api/v2"
15+
16+
"github.com/docker/libtrust"
17+
"github.com/golang/glog"
18+
digest "github.com/opencontainers/go-digest"
19+
20+
imageapi "github.com/openshift/origin/pkg/image/apis/image"
21+
)
22+
23+
func processManifestList(ctx context.Context, srcDigest digest.Digest, srcManifest distribution.Manifest, manifests distribution.ManifestService, ref imageapi.DockerImageReference, filterFn func(*manifestlist.ManifestDescriptor, bool) bool) ([]distribution.Manifest, distribution.Manifest, digest.Digest, error) {
24+
var srcManifests []distribution.Manifest
25+
switch t := srcManifest.(type) {
26+
case *manifestlist.DeserializedManifestList:
27+
manifestDigest := srcDigest
28+
manifestList := t
29+
30+
filtered := make([]manifestlist.ManifestDescriptor, 0, len(t.Manifests))
31+
for _, manifest := range t.Manifests {
32+
if !filterFn(&manifest, len(t.Manifests) > 1) {
33+
glog.V(5).Infof("Skipping image for %#v from %s", manifest.Platform, ref)
34+
continue
35+
}
36+
glog.V(5).Infof("Including image for %#v from %s", manifest.Platform, ref)
37+
filtered = append(filtered, manifest)
38+
}
39+
40+
if len(filtered) == 0 {
41+
return nil, nil, "", nil
42+
}
43+
44+
// if we're filtering the manifest list, update the source manifest and digest
45+
if len(filtered) != len(t.Manifests) {
46+
var err error
47+
t, err = manifestlist.FromDescriptors(filtered)
48+
if err != nil {
49+
return nil, nil, "", fmt.Errorf("unable to filter source image %s manifest list: %v", ref, err)
50+
}
51+
_, body, err := t.Payload()
52+
if err != nil {
53+
return nil, nil, "", fmt.Errorf("unable to filter source image %s manifest list (bad payload): %v", ref, err)
54+
}
55+
manifestList = t
56+
manifestDigest = srcDigest.Algorithm().FromBytes(body)
57+
glog.V(5).Infof("Filtered manifest list to new digest %s:\n%s", manifestDigest, body)
58+
}
59+
60+
for i, manifest := range t.Manifests {
61+
childManifest, err := manifests.Get(ctx, manifest.Digest, distribution.WithManifestMediaTypes([]string{manifestlist.MediaTypeManifestList, schema2.MediaTypeManifest}))
62+
if err != nil {
63+
return nil, nil, "", fmt.Errorf("unable to retrieve source image %s manifest #%d from manifest list: %v", ref, i+1, err)
64+
}
65+
srcManifests = append(srcManifests, childManifest)
66+
}
67+
68+
switch {
69+
case len(srcManifests) == 1:
70+
_, body, err := srcManifests[0].Payload()
71+
if err != nil {
72+
return nil, nil, "", fmt.Errorf("unable to convert source image %s manifest list to single manifest: %v", ref, err)
73+
}
74+
manifestDigest := srcDigest.Algorithm().FromBytes(body)
75+
glog.V(5).Infof("Used only one manifest from the list %s", manifestDigest)
76+
return srcManifests, srcManifests[0], manifestDigest, nil
77+
default:
78+
return append(srcManifests, manifestList), manifestList, manifestDigest, nil
79+
}
80+
81+
default:
82+
return []distribution.Manifest{srcManifest}, srcManifest, srcDigest, nil
83+
}
84+
}
85+
86+
// TDOO: remove when quay.io switches to v2 schema
87+
func putManifestInCompatibleSchema(
88+
ctx context.Context,
89+
srcManifest distribution.Manifest,
90+
tag string,
91+
toManifests distribution.ManifestService,
92+
// supports schema2 -> schema1 downconversion
93+
blobs distribution.BlobService,
94+
ref reference.Named,
95+
) (digest.Digest, error) {
96+
var options []distribution.ManifestServiceOption
97+
if len(tag) > 0 {
98+
glog.V(5).Infof("Put manifest %s:%s", ref, tag)
99+
options = []distribution.ManifestServiceOption{distribution.WithTag(tag)}
100+
} else {
101+
glog.V(5).Infof("Put manifest %s", ref)
102+
}
103+
toDigest, err := toManifests.Put(ctx, srcManifest, options...)
104+
if err == nil {
105+
return toDigest, nil
106+
}
107+
errs, ok := err.(errcode.Errors)
108+
if !ok || len(errs) == 0 {
109+
return toDigest, err
110+
}
111+
errcode, ok := errs[0].(errcode.Error)
112+
if !ok || errcode.ErrorCode() != v2.ErrorCodeManifestInvalid {
113+
return toDigest, err
114+
}
115+
// try downconverting to v2-schema1
116+
schema2Manifest, ok := srcManifest.(*schema2.DeserializedManifest)
117+
if !ok {
118+
return toDigest, err
119+
}
120+
tagRef, tagErr := reference.WithTag(ref, tag)
121+
if tagErr != nil {
122+
return toDigest, err
123+
}
124+
glog.V(5).Infof("Registry reported invalid manifest error, attempting to convert to v2schema1 as ref %s", tagRef)
125+
schema1Manifest, convertErr := convertToSchema1(ctx, blobs, schema2Manifest, tagRef)
126+
if convertErr != nil {
127+
return toDigest, err
128+
}
129+
if glog.V(6) {
130+
_, data, _ := schema1Manifest.Payload()
131+
glog.Infof("Converted to v2schema1\n%s", string(data))
132+
}
133+
return toManifests.Put(ctx, schema1Manifest, distribution.WithTag(tag))
134+
}
135+
136+
// TDOO: remove when quay.io switches to v2 schema
137+
func convertToSchema1(ctx context.Context, blobs distribution.BlobService, schema2Manifest *schema2.DeserializedManifest, ref reference.Named) (distribution.Manifest, error) {
138+
targetDescriptor := schema2Manifest.Target()
139+
configJSON, err := blobs.Get(ctx, targetDescriptor.Digest)
140+
if err != nil {
141+
return nil, err
142+
}
143+
trustKey, err := loadPrivateKey()
144+
if err != nil {
145+
return nil, err
146+
}
147+
builder := schema1.NewConfigManifestBuilder(blobs, trustKey, ref, configJSON)
148+
for _, d := range schema2Manifest.Layers {
149+
if err := builder.AppendReference(d); err != nil {
150+
return nil, err
151+
}
152+
}
153+
manifest, err := builder.Build(ctx)
154+
if err != nil {
155+
return nil, err
156+
}
157+
return manifest, nil
158+
}
159+
160+
var (
161+
privateKeyLock sync.Mutex
162+
privateKey libtrust.PrivateKey
163+
)
164+
165+
// TDOO: remove when quay.io switches to v2 schema
166+
func loadPrivateKey() (libtrust.PrivateKey, error) {
167+
privateKeyLock.Lock()
168+
defer privateKeyLock.Unlock()
169+
if privateKey != nil {
170+
return privateKey, nil
171+
}
172+
trustKey, err := libtrust.GenerateECP256PrivateKey()
173+
if err != nil {
174+
return nil, err
175+
}
176+
privateKey = trustKey
177+
return privateKey, nil
178+
}

pkg/oc/cli/cmd/image/mirror/mappings.go

+2-80
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,8 @@ import (
77
"strings"
88
"sync"
99

10-
"github.com/golang/glog"
11-
1210
"github.com/docker/distribution/registry/client/auth"
13-
14-
godigest "github.com/opencontainers/go-digest"
11+
digest "github.com/opencontainers/go-digest"
1512

1613
imageapi "github.com/openshift/origin/pkg/image/apis/image"
1714
)
@@ -170,7 +167,7 @@ type destinations struct {
170167
digests map[string]pushTargets
171168
}
172169

173-
func (d *destinations) mergeIntoDigests(srcDigest godigest.Digest, target pushTargets) {
170+
func (d *destinations) mergeIntoDigests(srcDigest digest.Digest, target pushTargets) {
174171
d.lock.Lock()
175172
defer d.lock.Unlock()
176173
srcKey := srcDigest.String()
@@ -278,78 +275,3 @@ func calculateDockerRegistryScopes(tree targetTree) map[string][]auth.Scope {
278275
}
279276
return uniqueScopes
280277
}
281-
282-
type workQueue struct {
283-
ch chan workUnit
284-
wg *sync.WaitGroup
285-
}
286-
287-
func newWorkQueue(workers int, stopCh <-chan struct{}) *workQueue {
288-
q := &workQueue{
289-
ch: make(chan workUnit, 100),
290-
wg: &sync.WaitGroup{},
291-
}
292-
go q.run(workers, stopCh)
293-
return q
294-
}
295-
296-
func (q *workQueue) run(workers int, stopCh <-chan struct{}) {
297-
for i := 0; i < workers; i++ {
298-
go func(i int) {
299-
defer glog.V(4).Infof("worker %d stopping", i)
300-
for {
301-
select {
302-
case work, ok := <-q.ch:
303-
if !ok {
304-
return
305-
}
306-
work.fn()
307-
work.wg.Done()
308-
case <-stopCh:
309-
return
310-
}
311-
}
312-
}(i)
313-
}
314-
<-stopCh
315-
}
316-
317-
func (q *workQueue) Batch(fn func(Work)) {
318-
w := &worker{
319-
wg: &sync.WaitGroup{},
320-
ch: q.ch,
321-
}
322-
fn(w)
323-
w.wg.Wait()
324-
}
325-
326-
func (q *workQueue) Queue(fn func(Work)) {
327-
w := &worker{
328-
wg: q.wg,
329-
ch: q.ch,
330-
}
331-
fn(w)
332-
}
333-
334-
func (q *workQueue) Done() {
335-
q.wg.Wait()
336-
}
337-
338-
type workUnit struct {
339-
fn func()
340-
wg *sync.WaitGroup
341-
}
342-
343-
type Work interface {
344-
Parallel(fn func())
345-
}
346-
347-
type worker struct {
348-
wg *sync.WaitGroup
349-
ch chan workUnit
350-
}
351-
352-
func (w *worker) Parallel(fn func()) {
353-
w.wg.Add(1)
354-
w.ch <- workUnit{wg: w.wg, fn: fn}
355-
}

0 commit comments

Comments
 (0)