Skip to content

Commit 0bd3ca6

Browse files
Push images from one registry to another
1 parent d417fdd commit 0bd3ca6

File tree

3 files changed

+361
-0
lines changed

3 files changed

+361
-0
lines changed

pkg/cmd/infra/pusher/pusher.go

+356
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
package pusher
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/http"
7+
"os"
8+
"strings"
9+
10+
"github.com/docker/distribution"
11+
"github.com/docker/distribution/digest"
12+
"github.com/docker/distribution/reference"
13+
"github.com/docker/distribution/registry/client"
14+
"github.com/docker/distribution/registry/client/auth"
15+
"github.com/golang/glog"
16+
"github.com/spf13/cobra"
17+
18+
apirequest "k8s.io/apiserver/pkg/endpoints/request"
19+
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
20+
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
21+
22+
"github.com/docker/distribution/manifest/schema2"
23+
ocmd "github.com/openshift/origin/pkg/cmd/cli/cmd"
24+
imageapi "github.com/openshift/origin/pkg/image/api"
25+
"github.com/openshift/origin/pkg/image/importer"
26+
)
27+
28+
var (
29+
longDesc = templates.LongDesc(`
30+
Push an image to a new location
31+
32+
Accepts a list of arguments defining source images that should be pushed to the provided
33+
destination image tag. Each argument is of the form "SRC=DST", where both parts must be
34+
valid image references ([registry[:port]/]repository[:tag|@digest]).
35+
`)
36+
)
37+
38+
type Mapping struct {
39+
Source imageapi.DockerImageReference
40+
Destination imageapi.DockerImageReference
41+
}
42+
43+
type pushOptions struct {
44+
Out, ErrOut io.Writer
45+
46+
Mappings []Mapping
47+
48+
Insecure bool
49+
SkipMount bool
50+
Force bool
51+
}
52+
53+
// NewCommandPusher helps to push and pull images.
54+
func NewCommandPusher(name string) *cobra.Command {
55+
o := &pushOptions{}
56+
57+
cmd := &cobra.Command{
58+
Use: fmt.Sprintf("%s SRC=DST[,...]", name),
59+
Short: "Push images to an image registry",
60+
Long: longDesc,
61+
Run: func(c *cobra.Command, args []string) {
62+
o.Out = os.Stdout
63+
o.ErrOut = c.OutOrStderr()
64+
kcmdutil.CheckErr(o.Complete(args))
65+
kcmdutil.CheckErr(o.Run())
66+
},
67+
}
68+
69+
cmd.AddCommand(ocmd.NewCmdVersion(name, nil, os.Stdout, ocmd.VersionOptions{}))
70+
71+
flag := cmd.Flags()
72+
flag.BoolVar(&o.Insecure, "insecure", o.Insecure, "If true, connections may be made over HTTP")
73+
flag.BoolVar(&o.SkipMount, "skip-mount", o.SkipMount, "If true, always push layers instead of cross-mounting them")
74+
flag.BoolVar(&o.Force, "force", o.Force, "If true, attempt to write all contents.")
75+
76+
return cmd
77+
}
78+
79+
func (o *pushOptions) Complete(args []string) error {
80+
var remainingArgs []string
81+
overlap := make(map[string]string)
82+
for _, s := range args {
83+
parts := strings.SplitN(s, "=", 2)
84+
if len(parts) != 2 {
85+
remainingArgs = append(remainingArgs, s)
86+
continue
87+
}
88+
if len(parts[0]) == 0 || len(parts[1]) == 0 {
89+
return fmt.Errorf("all arguments must be valid SRC=DST mappings")
90+
}
91+
92+
src, err := imageapi.ParseDockerImageReference(parts[0])
93+
if err != nil {
94+
return fmt.Errorf("%q is not a valid image reference: %v", parts[0], err)
95+
}
96+
if len(src.Tag) == 0 && len(src.ID) == 0 {
97+
return fmt.Errorf("you must specify a tag or digest for SRC")
98+
}
99+
dst, err := imageapi.ParseDockerImageReference(parts[1])
100+
if err != nil {
101+
return fmt.Errorf("%q is not a valid image reference: %v", parts[0], err)
102+
}
103+
if len(dst.Tag) == 0 || len(dst.ID) != 0 {
104+
return fmt.Errorf("you must specify a tag for DST")
105+
}
106+
if _, ok := overlap[dst.String()]; ok {
107+
return fmt.Errorf("each destination tag may only be specified once: %s", dst.String())
108+
}
109+
overlap[dst.String()] = src.String()
110+
111+
o.Mappings = append(o.Mappings, Mapping{Source: src, Destination: dst})
112+
}
113+
if len(remainingArgs) > 0 {
114+
return fmt.Errorf("all arguments must be valid SRC=DST mappings")
115+
}
116+
if len(o.Mappings) == 0 {
117+
return fmt.Errorf("you must specify at least one source image to pull and the destination to push to as SRC=DST")
118+
}
119+
return nil
120+
}
121+
122+
type key struct {
123+
registry string
124+
repository string
125+
}
126+
127+
type destination struct {
128+
ref imageapi.DockerImageReference
129+
tags []string
130+
}
131+
132+
type pushTargets map[key]destination
133+
134+
type destinations struct {
135+
ref imageapi.DockerImageReference
136+
tags map[string]pushTargets
137+
digests map[string]pushTargets
138+
}
139+
140+
func (d destinations) mergeIntoDigests(srcDigest digest.Digest, target pushTargets) {
141+
srcKey := srcDigest.String()
142+
current, ok := d.digests[srcKey]
143+
if !ok {
144+
d.digests[srcKey] = target
145+
return
146+
}
147+
for repo, dst := range target {
148+
existing, ok := current[repo]
149+
if !ok {
150+
current[repo] = dst
151+
continue
152+
}
153+
existing.tags = append(existing.tags, dst.tags...)
154+
}
155+
}
156+
157+
type targetTree map[key]destinations
158+
159+
func buildTargetTree(mappings []Mapping) targetTree {
160+
tree := make(targetTree)
161+
for _, m := range mappings {
162+
srcKey := key{registry: m.Source.Registry, repository: m.Source.RepositoryName()}
163+
dstKey := key{registry: m.Destination.Registry, repository: m.Destination.RepositoryName()}
164+
165+
src, ok := tree[srcKey]
166+
if !ok {
167+
src.ref = m.Source.AsRepository()
168+
src.digests = make(map[string]pushTargets)
169+
src.tags = make(map[string]pushTargets)
170+
tree[srcKey] = src
171+
}
172+
173+
var current pushTargets
174+
if tag := m.Source.Tag; len(tag) != 0 {
175+
current = src.tags[tag]
176+
if current == nil {
177+
current = make(pushTargets)
178+
src.tags[tag] = current
179+
}
180+
} else {
181+
current = src.digests[m.Source.ID]
182+
if current == nil {
183+
current = make(pushTargets)
184+
src.digests[m.Source.ID] = current
185+
}
186+
}
187+
188+
dst, ok := current[dstKey]
189+
if !ok {
190+
dst.ref = m.Destination.AsRepository()
191+
}
192+
dst.tags = append(dst.tags, m.Destination.Tag)
193+
current[dstKey] = dst
194+
}
195+
return tree
196+
}
197+
198+
func (o *pushOptions) Run() error {
199+
tree := buildTargetTree(o.Mappings)
200+
201+
creds := importer.NewLocalCredentials()
202+
ctx := apirequest.NewContext()
203+
204+
srcClient := importer.NewContext(http.DefaultTransport, http.DefaultTransport).WithCredentials(creds)
205+
toContext := importer.NewContext(http.DefaultTransport, http.DefaultTransport).WithActions("pull", "push")
206+
207+
for _, src := range tree {
208+
srcRepo, err := srcClient.Repository(ctx, src.ref.DockerClientDefaults().RegistryURL(), src.ref.RepositoryName(), o.Insecure)
209+
if err != nil {
210+
return fmt.Errorf("unable to connect to %s: %v", src.ref, err)
211+
}
212+
213+
manifests, err := srcRepo.Manifests(ctx)
214+
if err != nil {
215+
return fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)
216+
}
217+
// convert source tags to digests
218+
for srcTag, pushTargets := range src.tags {
219+
desc, err := srcRepo.Tags(ctx).Get(ctx, srcTag)
220+
if err != nil {
221+
return fmt.Errorf("unable to retrieve source image %s by tag: %v", src.ref, err)
222+
}
223+
srcDigest := desc.Digest
224+
fmt.Fprintf(o.Out, "Resolved source image tag %s to %s\n", src.ref, srcDigest)
225+
src.mergeIntoDigests(srcDigest, pushTargets)
226+
}
227+
228+
canonicalFrom := srcRepo.Named()
229+
230+
for srcDigestString, pushTargets := range src.digests {
231+
// load the manifest
232+
srcDigest := digest.Digest(srcDigestString)
233+
var contentDigest digest.Digest
234+
srcManifest, err := manifests.Get(ctx, digest.Digest(srcDigest), client.ReturnContentDigest(&contentDigest))
235+
if err != nil {
236+
return fmt.Errorf("unable to retrieve source image %s manifest: %v", src.ref, err)
237+
}
238+
239+
for _, dst := range pushTargets {
240+
// if we are going to be using cross repository mount, get a token that covers the src
241+
if src.ref.Registry == dst.ref.Registry {
242+
toContext = toContext.WithScopes(auth.RepositoryScope{Repository: src.ref.RepositoryName(), Actions: []string{"pull"}})
243+
}
244+
toClient := toContext.WithCredentials(creds)
245+
246+
toRepo, err := toClient.Repository(ctx, dst.ref.DockerClientDefaults().RegistryURL(), dst.ref.RepositoryName(), o.Insecure)
247+
if err != nil {
248+
return fmt.Errorf("unable to connect to %s: %v", dst.ref, err)
249+
}
250+
251+
canonicalTo := toRepo.Named()
252+
fmt.Fprintf(o.Out, "Connecting to %s for %s\n", canonicalFrom, canonicalTo)
253+
254+
toManifests, err := toRepo.Manifests(ctx)
255+
if err != nil {
256+
return fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)
257+
}
258+
259+
// if the destination tag already has this manifest, do nothing
260+
var mustCopyLayers bool
261+
if o.Force {
262+
mustCopyLayers = true
263+
} else {
264+
if _, err := toManifests.Get(ctx, srcDigest); err != nil {
265+
mustCopyLayers = true
266+
} else {
267+
glog.V(4).Infof("Manifest exists in %s, no need to copy layers without --force", dst.ref)
268+
}
269+
}
270+
if mustCopyLayers {
271+
fmt.Fprintf(o.Out, "Copying %s to %s (%d references)\n", src.ref, dst.ref, len(srcManifest.References()))
272+
273+
// upload all the blobs
274+
toBlobs := toRepo.Blobs(ctx)
275+
srcBlobs := srcRepo.Blobs(ctx)
276+
277+
// upload the config
278+
switch t := srcManifest.(type) {
279+
case *schema2.DeserializedManifest:
280+
contents, err := srcBlobs.Get(ctx, t.Config.Digest)
281+
if err != nil {
282+
return fmt.Errorf("unreadable image config %s: %v", t.Config.Digest, err)
283+
}
284+
desc, err := toBlobs.Put(ctx, t.Config.MediaType, contents)
285+
if err != nil {
286+
return fmt.Errorf("unable to upload manifest config to %s: %v", dst.ref, err)
287+
}
288+
if desc.Digest != t.Config.Digest {
289+
return fmt.Errorf("the digest changed from %s to %s", contentDigest, desc.Digest)
290+
}
291+
}
292+
293+
for _, blob := range srcManifest.References() {
294+
// tagging within the same registry is a no-op
295+
if src.ref.Registry == dst.ref.Registry && canonicalFrom.String() == canonicalTo.String() {
296+
continue
297+
}
298+
299+
var options []distribution.BlobCreateOption
300+
blobSource, err := reference.WithDigest(canonicalFrom, blob.Digest)
301+
if err != nil {
302+
return fmt.Errorf("unexpected error building named digest: %v", err)
303+
}
304+
if !o.SkipMount {
305+
options = append(options, client.WithMountFrom(blobSource))
306+
}
307+
308+
w, err := toBlobs.Create(ctx, options...)
309+
if ebm, ok := err.(distribution.ErrBlobMounted); ok {
310+
glog.V(5).Infof("Blob mounted %#v", blob)
311+
if ebm.From.Digest() != blob.Digest {
312+
return fmt.Errorf("unable to push %s: tried to mount blob %s src source and got back a different digest %s", src.ref, blob.Digest, ebm.From.Digest())
313+
}
314+
continue
315+
}
316+
if err != nil {
317+
return fmt.Errorf("unable to upload blob %s to %s: %v", blob.Digest, dst.ref, err)
318+
}
319+
err = func() error {
320+
glog.V(5).Infof("Uploading blob %s", blob.Digest)
321+
defer w.Cancel(ctx)
322+
r, err := srcBlobs.Open(ctx, blob.Digest)
323+
if err != nil {
324+
return fmt.Errorf("unable to open source layer %s to copy to %s: %v", blob.Digest, dst.ref, err)
325+
}
326+
defer r.Close()
327+
fmt.Fprintf(o.Out, "Copying to %s (%d bytes)\n", blob.Digest, blob.Size)
328+
n, err := w.ReadFrom(r)
329+
if err != nil {
330+
return fmt.Errorf("unable to copy layer %s to %s: %v", blob.Digest, dst.ref, err)
331+
}
332+
if n != blob.Size {
333+
fmt.Fprintf(o.ErrOut, "warning: Layer size mismatch for %s: had %d, wrote %d\n", blob.Digest, blob.Size, n)
334+
}
335+
_, err = w.Commit(ctx, blob)
336+
return err
337+
}()
338+
if err != nil {
339+
return err
340+
}
341+
}
342+
}
343+
344+
// upload and tag the manifest
345+
for _, tag := range dst.tags {
346+
toDigest, err := toManifests.Put(ctx, srcManifest, distribution.WithTag(tag))
347+
if err != nil {
348+
return fmt.Errorf("unable to push manifest to %s: %v", dst.ref, err)
349+
}
350+
fmt.Fprintf(o.Out, "Pushed to %s:%s as %s\n", dst.ref, tag, toDigest)
351+
}
352+
}
353+
}
354+
}
355+
return nil
356+
}

pkg/cmd/openshift/openshift.go

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/openshift/origin/pkg/cmd/flagtypes"
2424
"github.com/openshift/origin/pkg/cmd/infra/builder"
2525
"github.com/openshift/origin/pkg/cmd/infra/deployer"
26+
"github.com/openshift/origin/pkg/cmd/infra/pusher"
2627
irouter "github.com/openshift/origin/pkg/cmd/infra/router"
2728
"github.com/openshift/origin/pkg/cmd/recycle"
2829
"github.com/openshift/origin/pkg/cmd/server/start"
@@ -137,6 +138,7 @@ func NewCommandOpenShift(name string) *cobra.Command {
137138
builder.NewCommandDockerBuilder("docker-build"),
138139
diagnostics.NewCommandPodDiagnostics("diagnostic-pod", out),
139140
diagnostics.NewCommandNetworkPodDiagnostics("network-diagnostic-pod", out),
141+
pusher.NewCommandPusher("push"),
140142
)
141143
root.AddCommand(infra)
142144

pkg/image/importer/client.go

+3
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ func (r *repositoryRetriever) Repository(ctx gocontext.Context, registry *url.UR
119119
t = r.context.InsecureTransport
120120
}
121121
src := *registry
122+
if len(src.Scheme) == 0 {
123+
src.Scheme = "https"
124+
}
122125
// ping the registry to get challenge headers
123126
if err, ok := r.pings[src]; ok {
124127
if err != nil {

0 commit comments

Comments
 (0)