From f987ce309ee220e42cf1badd5d51696a2bddeaed Mon Sep 17 00:00:00 2001 From: Markus Walther Date: Tue, 13 Feb 2024 17:21:13 +0000 Subject: [PATCH 1/2] add Group.SetLimit(n int) --- group.go | 31 +++++++++++++++++++- group_test.go | 79 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 1 deletion(-) diff --git a/group.go b/group.go index 48b68fc..f2b0fc0 100644 --- a/group.go +++ b/group.go @@ -3,7 +3,12 @@ package multierror -import "sync" +import ( + "fmt" + "sync" +) + +type token struct{} // Group is a collection of goroutines which return errors that need to be // coalesced. @@ -11,6 +16,22 @@ type Group struct { mutex sync.Mutex err *Error wg sync.WaitGroup + sem chan token +} + +// SetLimit limits the number of goroutines that can be concurrently active. +// A negative value indicates no limit. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + + if g.sem != nil && len(g.sem) > 0 { + panic(fmt.Errorf("multierror: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + + g.sem = make(chan token, n) } // Go calls the given function in a new goroutine. @@ -18,11 +39,19 @@ type Group struct { // If the function returns an error it is added to the group multierror which // is returned by Wait. func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + g.wg.Add(1) go func() { defer g.wg.Done() + if g.sem != nil { + defer func() { <-g.sem }() + } + if err := f(); err != nil { g.mutex.Lock() g.err = Append(g.err, err) diff --git a/group_test.go b/group_test.go index 88795bf..21619ed 100644 --- a/group_test.go +++ b/group_test.go @@ -6,7 +6,9 @@ package multierror import ( "errors" "strings" + "sync/atomic" "testing" + "time" ) func TestGroup(t *testing.T) { @@ -45,3 +47,80 @@ func TestGroup(t *testing.T) { } } } + +func TestGroupSetLimit(t *testing.T) { + var ( + active int32 + maxActive int32 + ) + + g := &Group{} + g.SetLimit(2) + + work := func() error { + atomic.AddInt32(&active, 1) + + for { + currentMax := atomic.LoadInt32(&maxActive) + currentActive := atomic.LoadInt32(&active) + if currentActive > currentMax { + if atomic.CompareAndSwapInt32(&maxActive, currentMax, currentActive) { + break + } + } else { + break + } + } + + time.Sleep(200 * time.Millisecond) + + atomic.AddInt32(&active, -1) + + return nil + } + + // Start more goroutines than the limit + for i := 0; i < 5; i++ { + g.Go(work) + } + + err := g.Wait() + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + + if maxActive != 2 { + t.Errorf("expected max 2 active goroutines, got %d", maxActive) + } + + g = &Group{} + g.SetLimit(-1) + + // Test unlimited + for i := 0; i < 10; i++ { + g.Go(work) + } + + err = g.Wait() + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + + if maxActive != 10 { + t.Errorf("expected max 2 active goroutines, got %d", maxActive) + } + + defer func() { + if r := recover(); r == nil { + t.Errorf("expected panic when modifying limit, got none") + } + }() + + g = &Group{} + + g.SetLimit(2) + + g.Go(work) + + g.SetLimit(3) // attempt to modify limit while goroutine is active +} From 63bec963123128fd3c4de03f38754b8181c8e3a2 Mon Sep 17 00:00:00 2001 From: Markus Date: Wed, 22 May 2024 15:48:25 +0100 Subject: [PATCH 2/2] fix potential nil deref in Group.Go() defer Co-authored-by: Chris Campo --- group.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/group.go b/group.go index f2b0fc0..2ce6685 100644 --- a/group.go +++ b/group.go @@ -46,11 +46,12 @@ func (g *Group) Go(f func() error) { g.wg.Add(1) go func() { - defer g.wg.Done() - - if g.sem != nil { - defer func() { <-g.sem }() - } + defer func() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() + }() if err := f(); err != nil { g.mutex.Lock()