Skip to content

Commit 7429ced

Browse files
committed
Support WithProgress for remote Writes
- supported in WriteLayer, Write, WriteIndex, MultiWrite - chan is closed when write completes - an error is sent along the chan if there are any non-temporary errors uploading - if a layer already exists or is mounted, progress updates immediately to account for that - if layer upload fails and is retried, progress goes backward and goes back up
1 parent 4a92f6c commit 7429ced

File tree

6 files changed

+721
-22
lines changed

6 files changed

+721
-22
lines changed

pkg/v1/remote/multi_write.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
// Current limitations:
3434
// - All refs must share the same repository.
3535
// - Images cannot consist of stream.Layers.
36-
func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
36+
func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
3737
// Determine the repository being pushed to; if asked to push to
3838
// multiple repositories, give up.
3939
var repo, zero name.Repository
@@ -86,9 +86,49 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
8686
return err
8787
}
8888
w := writer{
89-
repo: repo,
90-
client: &http.Client{Transport: tr},
91-
context: o.context,
89+
repo: repo,
90+
client: &http.Client{Transport: tr},
91+
context: o.context,
92+
updates: o.updates,
93+
lastUpdate: &v1.Update{},
94+
}
95+
96+
// Collect the total size of blobs and manifests we're about to write.
97+
if o.updates != nil {
98+
defer close(o.updates)
99+
defer func() { sendError(o.updates, rerr) }()
100+
for _, b := range blobs {
101+
size, err := b.Size()
102+
if err != nil {
103+
return err
104+
}
105+
w.lastUpdate.Total += size
106+
}
107+
countManifest := func(t Taggable) error {
108+
b, err := t.RawManifest()
109+
if err != nil {
110+
return err
111+
}
112+
w.lastUpdate.Total += int64(len(b))
113+
return nil
114+
}
115+
for _, i := range images {
116+
if err := countManifest(i); err != nil {
117+
return err
118+
}
119+
}
120+
for _, nm := range newManifests {
121+
for _, i := range nm {
122+
if err := countManifest(i); err != nil {
123+
return err
124+
}
125+
}
126+
}
127+
for _, i := range indexes {
128+
if err := countManifest(i); err != nil {
129+
return err
130+
}
131+
}
92132
}
93133

94134
// Upload individual blobs and collect any errors.
@@ -160,8 +200,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
160200
}
161201
// Push originally requested index manifests, which might depend on
162202
// newly discovered manifests.
163-
return commitMany(indexes)
164203

204+
return commitMany(indexes)
165205
}
166206

167207
// addIndexBlobs adds blobs to the set of blobs we intend to upload, and

pkg/v1/remote/multi_write_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestMultiWrite_Deep(t *testing.T) {
156156
if err != nil {
157157
t.Fatal("random.Image:", err)
158158
}
159-
for i := 0; i < 10; i++ {
159+
for i := 0; i < 4; i++ {
160160
idx = mutate.AppendManifests(idx, mutate.IndexAddendum{Add: idx})
161161
}
162162

pkg/v1/remote/options.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type options struct {
3737
jobs int
3838
userAgent string
3939
allowNondistributableArtifacts bool
40+
updates chan<- v1.Update
4041
}
4142

4243
var defaultPlatform = v1.Platform{
@@ -184,3 +185,14 @@ func WithNondistributable(o *options) error {
184185
o.allowNondistributableArtifacts = true
185186
return nil
186187
}
188+
189+
// WithProgress takes a channel that will receive progress updates as bytes are written.
190+
//
191+
// Sending updates to an unbuffered channel will block writes, so callers
192+
// should provide a buffered channel to avoid potential deadlocks.
193+
func WithProgress(updates chan<- v1.Update) Option {
194+
return func(o *options) error {
195+
o.updates = updates
196+
return nil
197+
}
198+
}

0 commit comments

Comments
 (0)