Skip to content

Support WithProgress for remote Writes #967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions pkg/v1/remote/multi_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// Current limitations:
// - All refs must share the same repository.
// - Images cannot consist of stream.Layers.
func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
// Determine the repository being pushed to; if asked to push to
// multiple repositories, give up.
var repo, zero name.Repository
Expand Down Expand Up @@ -86,9 +86,49 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
return err
}
w := writer{
repo: repo,
client: &http.Client{Transport: tr},
context: o.context,
repo: repo,
client: &http.Client{Transport: tr},
context: o.context,
updates: o.updates,
lastUpdate: &v1.Update{},
}

// Collect the total size of blobs and manifests we're about to write.
if o.updates != nil {
defer close(o.updates)
defer func() { sendError(o.updates, rerr) }()
for _, b := range blobs {
size, err := b.Size()
if err != nil {
return err
}
w.lastUpdate.Total += size
}
countManifest := func(t Taggable) error {
b, err := t.RawManifest()
if err != nil {
return err
}
w.lastUpdate.Total += int64(len(b))
return nil
}
for _, i := range images {
if err := countManifest(i); err != nil {
return err
}
}
for _, nm := range newManifests {
for _, i := range nm {
if err := countManifest(i); err != nil {
return err
}
}
}
for _, i := range indexes {
if err := countManifest(i); err != nil {
return err
}
}
}

// Upload individual blobs and collect any errors.
Expand Down Expand Up @@ -160,8 +200,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) error {
}
// Push originally requested index manifests, which might depend on
// newly discovered manifests.
return commitMany(indexes)

return commitMany(indexes)
}

// addIndexBlobs adds blobs to the set of blobs we intend to upload, and
Expand Down
2 changes: 1 addition & 1 deletion pkg/v1/remote/multi_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestMultiWrite_Deep(t *testing.T) {
if err != nil {
t.Fatal("random.Image:", err)
}
for i := 0; i < 10; i++ {
for i := 0; i < 4; i++ {
idx = mutate.AppendManifests(idx, mutate.IndexAddendum{Add: idx})
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/v1/remote/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type options struct {
jobs int
userAgent string
allowNondistributableArtifacts bool
updates chan<- v1.Update
}

var defaultPlatform = v1.Platform{
Expand Down Expand Up @@ -184,3 +185,14 @@ func WithNondistributable(o *options) error {
o.allowNondistributableArtifacts = true
return nil
}

// WithProgress takes a channel that will receive progress updates as bytes are written.
//
// Sending updates to an unbuffered channel will block writes, so callers
// should provide a buffered channel to avoid potential deadlocks.
func WithProgress(updates chan<- v1.Update) Option {
return func(o *options) error {
o.updates = updates
return nil
}
}
Loading