Skip to content

Commit 1985ff4

Browse files
committed
Support WithProgress for remote Writes
- supported in WriteLayer, Write, WriteIndex, MultiWrite - chan is closed when write completes - an error is sent along the chan if there are any non-temporary errors uploading - if a layer already exists or is mounted, progress updates immediately to account for that - if layer upload fails and is retried, progress goes backward and goes back up
1 parent 70c58c0 commit 1985ff4

File tree

6 files changed

+708
-14
lines changed

6 files changed

+708
-14
lines changed

pkg/v1/remote/multi_write.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
// Current limitations:
3434
// - All refs must share the same repository.
3535
// - Images cannot consist of stream.Layers.
36-
func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
36+
func MultiWrite(m map[name.Reference]Taggable, options ...Option) (err error) {
3737
// Determine the repository being pushed to; if asked to push to
3838
// multiple repositories, give up.
3939
var repo, zero name.Repository
@@ -50,6 +50,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
5050
return err
5151
}
5252

53+
defer func() { sendError(o.updates, err) }()
54+
5355
// Collect unique blobs (layers and config blobs).
5456
blobs := map[v1.Hash]v1.Layer{}
5557
newManifests := []map[name.Reference]Taggable{}
@@ -89,6 +91,45 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
8991
repo: repo,
9092
client: &http.Client{Transport: tr},
9193
context: o.context,
94+
updates: o.updates,
95+
last: &v1.Update{},
96+
}
97+
98+
// Collect the total size of blobs and manifests we're about to write.
99+
if o.updates != nil {
100+
defer func() { close(o.updates) }()
101+
for _, b := range blobs {
102+
size, err := b.Size()
103+
if err != nil {
104+
return err
105+
}
106+
w.last.Total += size
107+
}
108+
countManifest := func(t Taggable) error {
109+
b, err := t.RawManifest()
110+
if err != nil {
111+
return err
112+
}
113+
w.last.Total += int64(len(b))
114+
return nil
115+
}
116+
for _, i := range images {
117+
if err := countManifest(i); err != nil {
118+
return err
119+
}
120+
}
121+
for _, nm := range newManifests {
122+
for _, i := range nm {
123+
if err := countManifest(i); err != nil {
124+
return err
125+
}
126+
}
127+
}
128+
for _, i := range indexes {
129+
if err := countManifest(i); err != nil {
130+
return err
131+
}
132+
}
92133
}
93134

94135
// Upload individual blobs and collect any errors.
@@ -160,8 +201,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
160201
}
161202
// Push originally requested index manifests, which might depend on
162203
// newly discovered manifests.
163-
return commitMany(indexes)
164204

205+
return commitMany(indexes)
165206
}
166207

167208
// addIndexBlobs adds blobs to the set of blobs we intend to upload, and

pkg/v1/remote/multi_write_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestMultiWrite_Deep(t *testing.T) {
156156
if err != nil {
157157
t.Fatal("random.Image:", err)
158158
}
159-
for i := 0; i < 10; i++ {
159+
for i := 0; i < 4; i++ {
160160
idx = mutate.AppendManifests(idx, mutate.IndexAddendum{Add: idx})
161161
}
162162

pkg/v1/remote/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type options struct {
3737
jobs int
3838
userAgent string
3939
allowNondistributableArtifacts bool
40+
updates chan<- v1.Update
4041
}
4142

4243
var defaultPlatform = v1.Platform{
@@ -184,3 +185,10 @@ func WithNondistributable(o *options) error {
184185
o.allowNondistributableArtifacts = true
185186
return nil
186187
}
188+
189+
func WithProgress(updates chan<- v1.Update) Option {
190+
return func(o *options) error {
191+
o.updates = updates
192+
return nil
193+
}
194+
}

0 commit comments

Comments
 (0)