@@ -453,138 +453,18 @@ func (o *pushOptions) Run() error {
453
453
glog .V (4 ).Infof ("Manifest exists in %s, no need to copy layers without --force" , dst .ref )
454
454
}
455
455
}
456
+
456
457
if mustCopyLayers {
457
- // upload all the blobs
458
- toBlobs := toRepo .Blobs (ctx )
459
- srcBlobs := srcRepo .Blobs (ctx )
460
-
461
- // upload the each manifest
462
- for _ , srcManifest := range srcManifests {
463
- switch srcManifest .(type ) {
464
- case * schema2.DeserializedManifest :
465
- case * manifestlist.DeserializedManifestList :
466
- // we do not need to upload layers in a manifestlist
467
- continue
468
- default :
469
- digestErrs = append (digestErrs , retrieverError {src : src .ref , dst : dst .ref , err : fmt .Errorf ("the manifest type %T is not supported" , srcManifest )})
470
- continue
471
- }
472
-
473
- for _ , blob := range srcManifest .References () {
474
- blobSource , err := reference .WithDigest (canonicalFrom , blob .Digest )
475
- if err != nil {
476
- digestErrs = append (digestErrs , retrieverError {src : src .ref , dst : dst .ref , err : fmt .Errorf ("unexpected error building named digest: %v" , err )})
477
- continue
478
- }
479
-
480
- // if we aren't forcing upload, skip the blob copy
481
- if ! o .Force {
482
- _ , err := toBlobs .Stat (ctx , blob .Digest )
483
- if err == nil {
484
- // blob exists, skip
485
- glog .V (5 ).Infof ("Server reports blob exists %#v" , blob )
486
- continue
487
- }
488
- if err != distribution .ErrBlobUnknown {
489
- glog .V (5 ).Infof ("Server was unable to check whether blob exists %s: %v" , blob .Digest , err )
490
- }
491
- }
492
-
493
- var options []distribution.BlobCreateOption
494
- if ! o .SkipMount {
495
- options = append (options , client .WithMountFrom (blobSource ), WithDescriptor (blob ))
496
- }
497
- w , err := toBlobs .Create (ctx , options ... )
498
- // no-op
499
- if err == ErrAlreadyExists {
500
- glog .V (5 ).Infof ("Blob already exists %#v" , blob )
501
- continue
502
- }
503
- // mount successful
504
- if ebm , ok := err .(distribution.ErrBlobMounted ); ok {
505
- glog .V (5 ).Infof ("Blob mounted %#v" , blob )
506
- if ebm .From .Digest () != blob .Digest {
507
- digestErrs = append (digestErrs , retrieverError {src : src .ref , dst : dst .ref , err : fmt .Errorf ("unable to push %s: tried to mount blob %s src source and got back a different digest %s" , src .ref , blob .Digest , ebm .From .Digest ())})
508
- break
509
- }
510
- continue
511
- }
512
- if err != nil {
513
- digestErrs = append (digestErrs , retrieverError {src : src .ref , dst : dst .ref , err : fmt .Errorf ("unable to upload blob %s to %s: %v" , blob .Digest , dst .ref , err )})
514
- break
515
- }
516
-
517
- err = func () error {
518
- glog .V (5 ).Infof ("Uploading blob %s" , blob .Digest )
519
- defer w .Cancel (ctx )
520
- r , err := srcBlobs .Open (ctx , blob .Digest )
521
- if err != nil {
522
- return fmt .Errorf ("unable to open source layer %s to copy to %s: %v" , blob .Digest , dst .ref , err )
523
- }
524
- defer r .Close ()
525
-
526
- switch dst .t {
527
- case DestinationS3 :
528
- fmt .Fprintf (o .ErrOut , "uploading: s3://%s %s %s\n " , dst .ref , blob .Digest , units .BytesSize (float64 (blob .Size )))
529
- default :
530
- fmt .Fprintf (o .ErrOut , "uploading: %s %s %s\n " , dst .ref , blob .Digest , units .BytesSize (float64 (blob .Size )))
531
- }
532
-
533
- n , err := w .ReadFrom (r )
534
- if err != nil {
535
- return fmt .Errorf ("unable to copy layer %s to %s: %v" , blob .Digest , dst .ref , err )
536
- }
537
- if n != blob .Size {
538
- fmt .Fprintf (o .ErrOut , "warning: Layer size mismatch for %s: had %d, wrote %d\n " , blob .Digest , blob .Size , n )
539
- }
540
- _ , err = w .Commit (ctx , blob )
541
- return err
542
- }()
543
- if err != nil {
544
- _ , srcBody , _ := srcManifest .Payload ()
545
- srcManifestDigest := godigest .Canonical .FromBytes (srcBody )
546
- if srcManifestDigest == srcDigest {
547
- digestErrs = append (digestErrs , retrieverError {src : src .ref , dst : dst .ref , err : fmt .Errorf ("failed to commit blob %s from manifest %s to %s: %v" , blob .Digest , srcManifestDigest , dst .ref , err )})
548
- } else {
549
- digestErrs = append (digestErrs , retrieverError {src : src .ref , dst : dst .ref , err : fmt .Errorf ("failed to commit blob %s from manifest %s in manifest list %s to %s: %v" , blob .Digest , srcManifestDigest , srcDigest , dst .ref , err )})
550
- }
551
- break
552
- }
553
- }
458
+ if errs := uploadBlobs (ctx , dst , srcRepo , toRepo , srcManifests , src .ref , srcDigest , canonicalFrom , o .Force , o .SkipMount , o .ErrOut ); len (errs ) > 0 {
459
+ digestErrs = append (digestErrs , errs ... )
460
+ continue
554
461
}
555
462
}
556
463
557
- if len (digestErrs ) > 0 {
464
+ if errs := uploadAndTagManifests (ctx , dst , srcManifest , src .ref , toManifests , o .Out ); len (errs ) > 0 {
465
+ digestErrs = append (digestErrs , errs ... )
558
466
continue
559
467
}
560
-
561
- // upload and tag the manifest
562
- for _ , tag := range dst .tags {
563
- toDigest , err := toManifests .Put (ctx , srcManifest , distribution .WithTag (tag ))
564
- if err != nil {
565
- digestErrs = append (digestErrs , retrieverError {src : src .ref , dst : dst .ref , err : fmt .Errorf ("unable to push manifest to %s: %v" , dst .ref , err )})
566
- continue
567
- }
568
- switch dst .t {
569
- case DestinationS3 :
570
- fmt .Fprintf (o .Out , "%s s3://%s:%s\n " , toDigest , dst .ref , tag )
571
- default :
572
- fmt .Fprintf (o .Out , "%s %s:%s\n " , toDigest , dst .ref , tag )
573
- }
574
- }
575
- if len (dst .tags ) == 0 {
576
- toDigest , err := toManifests .Put (ctx , srcManifest )
577
- if err != nil {
578
- digestErrs = append (digestErrs , retrieverError {src : src .ref , dst : dst .ref , err : fmt .Errorf ("unable to push manifest to %s: %v" , dst .ref , err )})
579
- continue
580
- }
581
- switch dst .t {
582
- case DestinationS3 :
583
- fmt .Fprintf (o .Out , "%s s3://%s\n " , toDigest , dst .ref )
584
- default :
585
- fmt .Fprintf (o .Out , "%s %s\n " , toDigest , dst .ref )
586
- }
587
- }
588
468
}
589
469
}
590
470
for _ , err := range append (tagErrs , digestErrs ... ) {
@@ -657,6 +537,165 @@ func processManifestList(ctx apirequest.Context, srcDigest godigest.Digest, srcM
657
537
}
658
538
}
659
539
540
+ func uploadBlobs (
541
+ ctx apirequest.Context ,
542
+ dst destination ,
543
+ srcRepo , toRepo distribution.Repository ,
544
+ srcManifests []distribution.Manifest ,
545
+ srcRef imageapi.DockerImageReference ,
546
+ srcDigest godigest.Digest ,
547
+ canonicalFrom reference.Named ,
548
+ force bool ,
549
+ skipMount bool ,
550
+ errOut io.Writer ,
551
+ ) []retrieverError {
552
+
553
+ // upload all the blobs
554
+ toBlobs := toRepo .Blobs (ctx )
555
+ srcBlobs := srcRepo .Blobs (ctx )
556
+
557
+ var errs []retrieverError
558
+
559
+ // upload the each manifest
560
+ for _ , srcManifest := range srcManifests {
561
+ switch srcManifest .(type ) {
562
+ case * schema2.DeserializedManifest :
563
+ case * manifestlist.DeserializedManifestList :
564
+ // we do not need to upload layers in a manifestlist
565
+ continue
566
+ default :
567
+ errs = append (errs , retrieverError {src : srcRef , dst : dst .ref , err : fmt .Errorf ("the manifest type %T is not supported" , srcManifest )})
568
+ continue
569
+ }
570
+
571
+ for _ , blob := range srcManifest .References () {
572
+ blobSource , err := reference .WithDigest (canonicalFrom , blob .Digest )
573
+ if err != nil {
574
+ errs = append (errs , retrieverError {src : srcRef , dst : dst .ref , err : fmt .Errorf ("unexpected error building named digest: %v" , err )})
575
+ continue
576
+ }
577
+
578
+ // if we aren't forcing upload, skip the blob copy
579
+ if ! force {
580
+ _ , err := toBlobs .Stat (ctx , blob .Digest )
581
+ if err == nil {
582
+ // blob exists, skip
583
+ glog .V (5 ).Infof ("Server reports blob exists %#v" , blob )
584
+ continue
585
+ }
586
+ if err != distribution .ErrBlobUnknown {
587
+ glog .V (5 ).Infof ("Server was unable to check whether blob exists %s: %v" , blob .Digest , err )
588
+ }
589
+ }
590
+
591
+ var options []distribution.BlobCreateOption
592
+ if ! skipMount {
593
+ options = append (options , client .WithMountFrom (blobSource ), WithDescriptor (blob ))
594
+ }
595
+ w , err := toBlobs .Create (ctx , options ... )
596
+ // no-op
597
+ if err == ErrAlreadyExists {
598
+ glog .V (5 ).Infof ("Blob already exists %#v" , blob )
599
+ continue
600
+ }
601
+ // mount successful
602
+ if ebm , ok := err .(distribution.ErrBlobMounted ); ok {
603
+ glog .V (5 ).Infof ("Blob mounted %#v" , blob )
604
+ if ebm .From .Digest () != blob .Digest {
605
+ errs = append (errs , retrieverError {src : srcRef , dst : dst .ref , err : fmt .Errorf ("unable to push %s: tried to mount blob %s src source and got back a different digest %s" , srcRef , blob .Digest , ebm .From .Digest ())})
606
+ break
607
+ }
608
+ continue
609
+ }
610
+ if err != nil {
611
+ errs = append (errs , retrieverError {src : srcRef , dst : dst .ref , err : fmt .Errorf ("unable to upload blob %s to %s: %v" , blob .Digest , dst .ref , err )})
612
+ break
613
+ }
614
+
615
+ err = func () error {
616
+ glog .V (5 ).Infof ("Uploading blob %s" , blob .Digest )
617
+ defer w .Cancel (ctx )
618
+ r , err := srcBlobs .Open (ctx , blob .Digest )
619
+ if err != nil {
620
+ return fmt .Errorf ("unable to open source layer %s to copy to %s: %v" , blob .Digest , dst .ref , err )
621
+ }
622
+ defer r .Close ()
623
+
624
+ switch dst .t {
625
+ case DestinationS3 :
626
+ fmt .Fprintf (errOut , "uploading: s3://%s %s %s\n " , dst .ref , blob .Digest , units .BytesSize (float64 (blob .Size )))
627
+ default :
628
+ fmt .Fprintf (errOut , "uploading: %s %s %s\n " , dst .ref , blob .Digest , units .BytesSize (float64 (blob .Size )))
629
+ }
630
+
631
+ n , err := w .ReadFrom (r )
632
+ if err != nil {
633
+ return fmt .Errorf ("unable to copy layer %s to %s: %v" , blob .Digest , dst .ref , err )
634
+ }
635
+ if n != blob .Size {
636
+ fmt .Fprintf (errOut , "warning: Layer size mismatch for %s: had %d, wrote %d\n " , blob .Digest , blob .Size , n )
637
+ }
638
+ _ , err = w .Commit (ctx , blob )
639
+ return err
640
+ }()
641
+ if err != nil {
642
+ _ , srcBody , _ := srcManifest .Payload ()
643
+ srcManifestDigest := godigest .Canonical .FromBytes (srcBody )
644
+ if srcManifestDigest == srcDigest {
645
+ errs = append (errs , retrieverError {src : srcRef , dst : dst .ref , err : fmt .Errorf ("failed to commit blob %s from manifest %s to %s: %v" , blob .Digest , srcManifestDigest , dst .ref , err )})
646
+ } else {
647
+ errs = append (errs , retrieverError {src : srcRef , dst : dst .ref , err : fmt .Errorf ("failed to commit blob %s from manifest %s in manifest list %s to %s: %v" , blob .Digest , srcManifestDigest , srcDigest , dst .ref , err )})
648
+ }
649
+ break
650
+ }
651
+ }
652
+ }
653
+ return errs
654
+ }
655
+
656
+ func uploadAndTagManifests (
657
+ ctx apirequest.Context ,
658
+ dst destination ,
659
+ srcManifest distribution.Manifest ,
660
+ srcRef imageapi.DockerImageReference ,
661
+ toManifests distribution.ManifestService ,
662
+ out io.Writer ,
663
+ ) []retrieverError {
664
+ var errs []retrieverError
665
+
666
+ // upload and tag the manifest
667
+ for _ , tag := range dst .tags {
668
+ toDigest , err := toManifests .Put (ctx , srcManifest , distribution .WithTag (tag ))
669
+ if err != nil {
670
+ errs = append (errs , retrieverError {src : srcRef , dst : dst .ref , err : fmt .Errorf ("unable to push manifest to %s: %v" , dst .ref , err )})
671
+ continue
672
+ }
673
+ switch dst .t {
674
+ case DestinationS3 :
675
+ fmt .Fprintf (out , "%s s3://%s:%s\n " , toDigest , dst .ref , tag )
676
+ default :
677
+ fmt .Fprintf (out , "%s %s:%s\n " , toDigest , dst .ref , tag )
678
+ }
679
+ }
680
+ if len (dst .tags ) != 0 {
681
+ return errs
682
+ }
683
+
684
+ // this is a pure manifest move, put the manifest by its id
685
+ toDigest , err := toManifests .Put (ctx , srcManifest )
686
+ if err != nil {
687
+ errs = append (errs , retrieverError {src : srcRef , dst : dst .ref , err : fmt .Errorf ("unable to push manifest to %s: %v" , dst .ref , err )})
688
+ return errs
689
+ }
690
+ switch dst .t {
691
+ case DestinationS3 :
692
+ fmt .Fprintf (out , "%s s3://%s\n " , toDigest , dst .ref )
693
+ default :
694
+ fmt .Fprintf (out , "%s %s\n " , toDigest , dst .ref )
695
+ }
696
+ return errs
697
+ }
698
+
660
699
type optionFunc func (interface {}) error
661
700
662
701
func (f optionFunc ) Apply (v interface {}) error {
0 commit comments