Skip to content

Commit 48a1d5b

Browse files
Push images from one registry to another
1 parent d417fdd commit 48a1d5b

File tree

3 files changed

+397
-0
lines changed

3 files changed

+397
-0
lines changed

pkg/cmd/infra/pusher/pusher.go

+392
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,392 @@
1+
package pusher
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/http"
7+
"os"
8+
"strings"
9+
10+
"github.com/docker/distribution"
11+
"github.com/docker/distribution/digest"
12+
"github.com/docker/distribution/reference"
13+
"github.com/docker/distribution/registry/client"
14+
"github.com/docker/distribution/registry/client/auth"
15+
"github.com/golang/glog"
16+
"github.com/spf13/cobra"
17+
18+
kerrors "k8s.io/apimachinery/pkg/util/errors"
19+
apirequest "k8s.io/apiserver/pkg/endpoints/request"
20+
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
21+
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
22+
23+
"github.com/docker/distribution/manifest/schema2"
24+
ocmd "github.com/openshift/origin/pkg/cmd/cli/cmd"
25+
imageapi "github.com/openshift/origin/pkg/image/api"
26+
"github.com/openshift/origin/pkg/image/importer"
27+
)
28+
29+
var (
30+
longDesc = templates.LongDesc(`
31+
Push an image to a new location
32+
33+
Accepts a list of arguments defining source images that should be pushed to the provided
34+
destination image tag. Each argument is of the form "SRC=DST", where both parts must be
35+
valid image references ([registry[:port]/]repository[:tag|@digest]).
36+
`)
37+
)
38+
39+
type Mapping struct {
40+
Source imageapi.DockerImageReference
41+
Destination imageapi.DockerImageReference
42+
}
43+
44+
type pushOptions struct {
45+
Out, ErrOut io.Writer
46+
47+
Mappings []Mapping
48+
49+
Insecure bool
50+
SkipMount bool
51+
Force bool
52+
}
53+
54+
// NewCommandPusher helps to push and pull images.
55+
func NewCommandPusher(name string) *cobra.Command {
56+
o := &pushOptions{}
57+
58+
cmd := &cobra.Command{
59+
Use: fmt.Sprintf("%s SRC=DST[,...]", name),
60+
Short: "Push images to an image registry",
61+
Long: longDesc,
62+
Run: func(c *cobra.Command, args []string) {
63+
o.Out = os.Stdout
64+
o.ErrOut = c.OutOrStderr()
65+
kcmdutil.CheckErr(o.Complete(args))
66+
kcmdutil.CheckErr(o.Run())
67+
},
68+
}
69+
70+
cmd.AddCommand(ocmd.NewCmdVersion(name, nil, os.Stdout, ocmd.VersionOptions{}))
71+
72+
flag := cmd.Flags()
73+
flag.BoolVar(&o.Insecure, "insecure", o.Insecure, "If true, connections may be made over HTTP")
74+
flag.BoolVar(&o.SkipMount, "skip-mount", o.SkipMount, "If true, always push layers instead of cross-mounting them")
75+
flag.BoolVar(&o.Force, "force", o.Force, "If true, attempt to write all contents.")
76+
77+
return cmd
78+
}
79+
80+
func (o *pushOptions) Complete(args []string) error {
81+
var remainingArgs []string
82+
overlap := make(map[string]string)
83+
for _, s := range args {
84+
parts := strings.SplitN(s, "=", 2)
85+
if len(parts) != 2 {
86+
remainingArgs = append(remainingArgs, s)
87+
continue
88+
}
89+
if len(parts[0]) == 0 || len(parts[1]) == 0 {
90+
return fmt.Errorf("all arguments must be valid SRC=DST mappings")
91+
}
92+
93+
src, err := imageapi.ParseDockerImageReference(parts[0])
94+
if err != nil {
95+
return fmt.Errorf("%q is not a valid image reference: %v", parts[0], err)
96+
}
97+
if len(src.Tag) == 0 && len(src.ID) == 0 {
98+
return fmt.Errorf("you must specify a tag or digest for SRC")
99+
}
100+
dst, err := imageapi.ParseDockerImageReference(parts[1])
101+
if err != nil {
102+
return fmt.Errorf("%q is not a valid image reference: %v", parts[0], err)
103+
}
104+
if len(dst.Tag) == 0 || len(dst.ID) != 0 {
105+
return fmt.Errorf("you must specify a tag for DST")
106+
}
107+
if _, ok := overlap[dst.String()]; ok {
108+
return fmt.Errorf("each destination tag may only be specified once: %s", dst.String())
109+
}
110+
overlap[dst.String()] = src.String()
111+
112+
o.Mappings = append(o.Mappings, Mapping{Source: src, Destination: dst})
113+
}
114+
if len(remainingArgs) > 0 {
115+
return fmt.Errorf("all arguments must be valid SRC=DST mappings")
116+
}
117+
if len(o.Mappings) == 0 {
118+
return fmt.Errorf("you must specify at least one source image to pull and the destination to push to as SRC=DST")
119+
}
120+
return nil
121+
}
122+
123+
type key struct {
124+
registry string
125+
repository string
126+
}
127+
128+
type destination struct {
129+
ref imageapi.DockerImageReference
130+
tags []string
131+
}
132+
133+
type pushTargets map[key]destination
134+
135+
type destinations struct {
136+
ref imageapi.DockerImageReference
137+
tags map[string]pushTargets
138+
digests map[string]pushTargets
139+
}
140+
141+
func (d destinations) mergeIntoDigests(srcDigest digest.Digest, target pushTargets) {
142+
srcKey := srcDigest.String()
143+
current, ok := d.digests[srcKey]
144+
if !ok {
145+
d.digests[srcKey] = target
146+
return
147+
}
148+
for repo, dst := range target {
149+
existing, ok := current[repo]
150+
if !ok {
151+
current[repo] = dst
152+
continue
153+
}
154+
existing.tags = append(existing.tags, dst.tags...)
155+
}
156+
}
157+
158+
type targetTree map[key]destinations
159+
160+
func buildTargetTree(mappings []Mapping) targetTree {
161+
tree := make(targetTree)
162+
for _, m := range mappings {
163+
srcKey := key{registry: m.Source.Registry, repository: m.Source.RepositoryName()}
164+
dstKey := key{registry: m.Destination.Registry, repository: m.Destination.RepositoryName()}
165+
166+
src, ok := tree[srcKey]
167+
if !ok {
168+
src.ref = m.Source.AsRepository()
169+
src.digests = make(map[string]pushTargets)
170+
src.tags = make(map[string]pushTargets)
171+
tree[srcKey] = src
172+
}
173+
174+
var current pushTargets
175+
if tag := m.Source.Tag; len(tag) != 0 {
176+
current = src.tags[tag]
177+
if current == nil {
178+
current = make(pushTargets)
179+
src.tags[tag] = current
180+
}
181+
} else {
182+
current = src.digests[m.Source.ID]
183+
if current == nil {
184+
current = make(pushTargets)
185+
src.digests[m.Source.ID] = current
186+
}
187+
}
188+
189+
dst, ok := current[dstKey]
190+
if !ok {
191+
dst.ref = m.Destination.AsRepository()
192+
}
193+
dst.tags = append(dst.tags, m.Destination.Tag)
194+
current[dstKey] = dst
195+
}
196+
return tree
197+
}
198+
199+
type retrieverError struct {
200+
src, dst imageapi.DockerImageReference
201+
err error
202+
}
203+
204+
func (e retrieverError) Error() string {
205+
return e.err.Error()
206+
}
207+
208+
func (o *pushOptions) Run() error {
209+
tree := buildTargetTree(o.Mappings)
210+
211+
creds := importer.NewLocalCredentials()
212+
ctx := apirequest.NewContext()
213+
214+
srcClient := importer.NewContext(http.DefaultTransport, http.DefaultTransport).WithCredentials(creds)
215+
toContext := importer.NewContext(http.DefaultTransport, http.DefaultTransport).WithActions("pull", "push")
216+
217+
var errs []error
218+
for _, src := range tree {
219+
srcRepo, err := srcClient.Repository(ctx, src.ref.DockerClientDefaults().RegistryURL(), src.ref.RepositoryName(), o.Insecure)
220+
if err != nil {
221+
errs = append(errs, retrieverError{err: fmt.Errorf("unable to connect to %s: %v", src.ref, err), src: src.ref})
222+
continue
223+
}
224+
225+
manifests, err := srcRepo.Manifests(ctx)
226+
if err != nil {
227+
errs = append(errs, retrieverError{src: src.ref, err: fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)})
228+
continue
229+
}
230+
231+
var tagErrs []retrieverError
232+
var digestErrs []retrieverError
233+
234+
// convert source tags to digests
235+
for srcTag, pushTargets := range src.tags {
236+
desc, err := srcRepo.Tags(ctx).Get(ctx, srcTag)
237+
if err != nil {
238+
tagErrs = append(tagErrs, retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s by tag: %v", src.ref, err)})
239+
continue
240+
}
241+
srcDigest := desc.Digest
242+
fmt.Fprintf(o.Out, "Resolved source image tag %s to %s\n", src.ref, srcDigest)
243+
src.mergeIntoDigests(srcDigest, pushTargets)
244+
}
245+
246+
canonicalFrom := srcRepo.Named()
247+
248+
for srcDigestString, pushTargets := range src.digests {
249+
// load the manifest
250+
srcDigest := digest.Digest(srcDigestString)
251+
var contentDigest digest.Digest
252+
srcManifest, err := manifests.Get(ctx, digest.Digest(srcDigest), client.ReturnContentDigest(&contentDigest))
253+
if err != nil {
254+
digestErrs = append(digestErrs, retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s manifest: %v", src.ref, err)})
255+
continue
256+
}
257+
258+
for _, dst := range pushTargets {
259+
// if we are going to be using cross repository mount, get a token that covers the src
260+
if src.ref.Registry == dst.ref.Registry {
261+
toContext = toContext.WithScopes(auth.RepositoryScope{Repository: src.ref.RepositoryName(), Actions: []string{"pull"}})
262+
}
263+
toClient := toContext.WithCredentials(creds)
264+
265+
toRepo, err := toClient.Repository(ctx, dst.ref.DockerClientDefaults().RegistryURL(), dst.ref.RepositoryName(), o.Insecure)
266+
if err != nil {
267+
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to connect to %s: %v", dst.ref, err)})
268+
continue
269+
}
270+
271+
canonicalTo := toRepo.Named()
272+
fmt.Fprintf(o.Out, "Connecting to %s for %s\n", canonicalFrom, canonicalTo)
273+
274+
toManifests, err := toRepo.Manifests(ctx)
275+
if err != nil {
276+
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)})
277+
continue
278+
}
279+
280+
// if the destination tag already has this manifest, do nothing
281+
var mustCopyLayers bool
282+
if o.Force {
283+
mustCopyLayers = true
284+
} else {
285+
if _, err := toManifests.Get(ctx, srcDigest); err != nil {
286+
mustCopyLayers = true
287+
} else {
288+
glog.V(4).Infof("Manifest exists in %s, no need to copy layers without --force", dst.ref)
289+
}
290+
}
291+
if mustCopyLayers {
292+
fmt.Fprintf(o.Out, "Copying %s to %s (%d references)\n", src.ref, dst.ref, len(srcManifest.References()))
293+
294+
// upload all the blobs
295+
toBlobs := toRepo.Blobs(ctx)
296+
srcBlobs := srcRepo.Blobs(ctx)
297+
298+
// upload the config
299+
switch t := srcManifest.(type) {
300+
case *schema2.DeserializedManifest:
301+
contents, err := srcBlobs.Get(ctx, t.Config.Digest)
302+
if err != nil {
303+
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unreadable image config %s: %v", t.Config.Digest, err)})
304+
continue
305+
}
306+
desc, err := toBlobs.Put(ctx, t.Config.MediaType, contents)
307+
if err != nil {
308+
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to upload manifest config to %s: %v", dst.ref, err)})
309+
continue
310+
}
311+
if desc.Digest != t.Config.Digest {
312+
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("the digest changed from %s to %s", contentDigest, desc.Digest)})
313+
continue
314+
}
315+
}
316+
317+
for _, blob := range srcManifest.References() {
318+
// tagging within the same registry is a no-op
319+
if src.ref.Registry == dst.ref.Registry && canonicalFrom.String() == canonicalTo.String() {
320+
continue
321+
}
322+
323+
var options []distribution.BlobCreateOption
324+
blobSource, err := reference.WithDigest(canonicalFrom, blob.Digest)
325+
if err != nil {
326+
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unexpected error building named digest: %v", err)})
327+
continue
328+
}
329+
if !o.SkipMount {
330+
options = append(options, client.WithMountFrom(blobSource))
331+
}
332+
333+
w, err := toBlobs.Create(ctx, options...)
334+
if ebm, ok := err.(distribution.ErrBlobMounted); ok {
335+
glog.V(5).Infof("Blob mounted %#v", blob)
336+
if ebm.From.Digest() != blob.Digest {
337+
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to push %s: tried to mount blob %s src source and got back a different digest %s", src.ref, blob.Digest, ebm.From.Digest())})
338+
continue
339+
}
340+
break
341+
}
342+
if err != nil {
343+
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to upload blob %s to %s: %v", blob.Digest, dst.ref, err)})
344+
break
345+
}
346+
err = func() error {
347+
glog.V(5).Infof("Uploading blob %s", blob.Digest)
348+
defer w.Cancel(ctx)
349+
r, err := srcBlobs.Open(ctx, blob.Digest)
350+
if err != nil {
351+
return fmt.Errorf("unable to open source layer %s to copy to %s: %v", blob.Digest, dst.ref, err)
352+
}
353+
defer r.Close()
354+
fmt.Fprintf(o.Out, "Copying to %s (%d bytes)\n", blob.Digest, blob.Size)
355+
n, err := w.ReadFrom(r)
356+
if err != nil {
357+
return fmt.Errorf("unable to copy layer %s to %s: %v", blob.Digest, dst.ref, err)
358+
}
359+
if n != blob.Size {
360+
fmt.Fprintf(o.ErrOut, "warning: Layer size mismatch for %s: had %d, wrote %d\n", blob.Digest, blob.Size, n)
361+
}
362+
_, err = w.Commit(ctx, blob)
363+
return err
364+
}()
365+
if err != nil {
366+
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: err})
367+
break
368+
}
369+
}
370+
}
371+
372+
if len(digestErrs) > 0 {
373+
continue
374+
}
375+
376+
// upload and tag the manifest
377+
for _, tag := range dst.tags {
378+
toDigest, err := toManifests.Put(ctx, srcManifest, distribution.WithTag(tag))
379+
if err != nil {
380+
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to push manifest to %s: %v", dst.ref, err)})
381+
continue
382+
}
383+
fmt.Fprintf(o.Out, "Pushed to %s:%s as %s\n", dst.ref, tag, toDigest)
384+
}
385+
}
386+
}
387+
for _, err := range append(tagErrs, digestErrs...) {
388+
errs = append(errs, err)
389+
}
390+
}
391+
return kerrors.NewAggregate(errs)
392+
}

0 commit comments

Comments
 (0)