-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcounter.go
127 lines (110 loc) · 2.98 KB
/
counter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package todocounter
import (
"sync"
)
// Counter records things remaining to process. It is needed for complicated
// cases where multiple goroutines are spawned to process items, and they may
// generate more items to process. For example, say a query over a set of nodes
// may yield either a result value, or more nodes to query. Signaling is subtly
// complicated, because the queue may be empty while items are being processed,
// that will end up adding more items to the queue.
//
// Use Counter like this:
//
// todos := make(chan int, 10)
// ctr := todoctr.NewCounter()
//
// process := func(item int) {
// fmt.Println("processing %d\n...", item)
//
// // this task may randomly generate more tasks
// if rand.Intn(5) == 0 {
// todos<- item + 1
// ctr.Increment(1) // increment counter for new task.
// }
//
// ctr.Decrement(1) // decrement one to signal the task being done.
// }
//
// // add some tasks.
// todos<- 1
// todos<- 2
// todos<- 3
// todos<- 4
// ctr.Increment(4)
//
// for {
// select {
// case item := <- todos:
// go process(item)
// case <-ctr.Done():
// fmt.Println("done processing everything.")
// close(todos)
// }
// }
type Counter interface {
// Incrememnt adds a number of todos to track.
// If the counter is **below** zero, it panics.
Increment(i uint32)
// Decrement removes a number of todos to track.
// If the count drops to zero, signals done and destroys the counter.
// If the count drops **below** zero, panics. It means you have tried to remove
// more things than you added, i.e. sync issues.
Decrement(i uint32)
// Done returns a channel to wait upon. Use it in selects:
//
// select {
// case <-ctr.Done():
// // done processing all items
// }
//
Done() <-chan struct{}
Remaining() int32
}
type todoCounter struct {
count int32
done chan struct{}
sync.RWMutex
}
// NewSyncCounter constructs a new counter
func NewSyncCounter() Counter {
return &todoCounter{
done: make(chan struct{}),
}
}
func (c *todoCounter) Increment(i uint32) {
c.Lock()
defer c.Unlock()
if c.count < 0 {
panic("counter already signaled done. use a new counter.")
}
// increment count
c.count += int32(i)
}
// Decrement removes a number of todos to track.
// If the count drops to zero, signals done and destroys the counter.
// If the count drops **below** zero, panics. It means you have tried to remove
// more things than you added, i.e. sync issues.
func (c *todoCounter) Decrement(i uint32) {
c.Lock()
defer c.Unlock()
if c.count < 0 {
panic("counter already signaled done. probably have sync issues.")
}
if int32(i) > c.count {
panic("decrement amount creater than counter. sync issues.")
}
c.count -= int32(i)
if c.count == 0 { // done! signal it.
c.count-- // set it to -1 to prevent reuse
close(c.done) // a closed channel will always return nil
}
}
func (c *todoCounter) Done() <-chan struct{} {
return c.done
}
func (c *todoCounter) Remaining() int32 {
c.RLock()
defer c.RUnlock()
return c.count
}