Skip to content

Commit 9bd9ea9

Browse files
authored
feat(taskqueue): fix race on peer state gather (#303)
1 parent 3bd6478 commit 9bd9ea9

File tree

4 files changed

+43
-17
lines changed

4 files changed

+43
-17
lines changed

Diff for: requestmanager/server.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -395,14 +395,17 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error {
395395
}
396396

397397
func (rm *RequestManager) peerStats(p peer.ID) peerstate.PeerState {
398-
requestStates := make(graphsync.RequestStates)
399-
for id, ipr := range rm.inProgressRequestStatuses {
400-
if ipr.p == p {
401-
requestStates[id] = graphsync.RequestState(ipr.state)
398+
var peerState peerstate.PeerState
399+
rm.requestQueue.WithPeerTopics(p, func(peerTopics *peertracker.PeerTrackerTopics) {
400+
requestStates := make(graphsync.RequestStates)
401+
for id, ipr := range rm.inProgressRequestStatuses {
402+
if ipr.p == p {
403+
requestStates[id] = graphsync.RequestState(ipr.state)
404+
}
402405
}
403-
}
404-
peerTopics := rm.requestQueue.PeerTopics(p)
405-
return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)}
406+
peerState = peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)}
407+
})
408+
return peerState
406409
}
407410

408411
func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState {

Diff for: responsemanager/responsemanager_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1306,8 +1306,8 @@ func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) {
13061306
func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {}
13071307
func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {}
13081308
func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} }
1309-
func (ntq nullTaskQueue) PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics {
1310-
return &peertracker.PeerTrackerTopics{Pending: ntq.tasksQueued[p]}
1309+
func (ntq nullTaskQueue) WithPeerTopics(p peer.ID, f func(*peertracker.PeerTrackerTopics)) {
1310+
f(&peertracker.PeerTrackerTopics{Pending: ntq.tasksQueued[p]})
13111311
}
13121312

13131313
var _ taskqueue.TaskQueue = nullTaskQueue{}

Diff for: responsemanager/server.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -293,14 +293,17 @@ func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID
293293
}
294294

295295
func (rm *ResponseManager) peerState(p peer.ID) peerstate.PeerState {
296-
requestStates := make(graphsync.RequestStates)
297-
for key, ipr := range rm.inProgressResponses {
298-
if key.p == p {
299-
requestStates[key.requestID] = ipr.state
296+
var peerState peerstate.PeerState
297+
rm.responseQueue.WithPeerTopics(p, func(peerTopics *peertracker.PeerTrackerTopics) {
298+
requestStates := make(graphsync.RequestStates)
299+
for key, ipr := range rm.inProgressResponses {
300+
if key.p == p {
301+
requestStates[key.requestID] = ipr.state
302+
}
300303
}
301-
}
302-
peerTopics := rm.responseQueue.PeerTopics(p)
303-
return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)}
304+
peerState = peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)}
305+
})
306+
return peerState
304307
}
305308

306309
func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState {

Diff for: taskqueue/taskqueue.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ type TaskQueue interface {
2525
TaskDone(p peer.ID, task *peertask.Task)
2626
Remove(t peertask.Topic, p peer.ID)
2727
Stats() graphsync.RequestStats
28-
PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics
28+
WithPeerTopics(p peer.ID, f func(*peertracker.PeerTrackerTopics))
2929
}
3030

3131
// WorkerTaskQueue is a wrapper around peertaskqueue.PeerTaskQueue that manages running workers
3232
// that pop tasks and execute them
3333
type WorkerTaskQueue struct {
34+
lockTopics sync.Mutex
3435
*peertaskqueue.PeerTaskQueue
3536
ctx context.Context
3637
cancelFn func()
@@ -55,7 +56,9 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue {
5556

5657
// PushTask pushes a new task on to the queue
5758
func (tq *WorkerTaskQueue) PushTask(p peer.ID, task peertask.Task) {
59+
tq.lockTopics.Lock()
5860
tq.PeerTaskQueue.PushTasks(p, task)
61+
tq.lockTopics.Unlock()
5962
select {
6063
case tq.workSignal <- struct{}{}:
6164
default:
@@ -64,19 +67,30 @@ func (tq *WorkerTaskQueue) PushTask(p peer.ID, task peertask.Task) {
6467

6568
// TaskDone marks a task as completed so further tasks can be executed
6669
func (tq *WorkerTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {
70+
tq.lockTopics.Lock()
6771
tq.PeerTaskQueue.TasksDone(p, task)
72+
tq.lockTopics.Unlock()
6873
}
6974

7075
// Stats returns statistics about a task queue
7176
func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats {
77+
tq.lockTopics.Lock()
7278
ptqstats := tq.PeerTaskQueue.Stats()
79+
tq.lockTopics.Unlock()
7380
return graphsync.RequestStats{
7481
TotalPeers: uint64(ptqstats.NumPeers),
7582
Active: uint64(ptqstats.NumActive),
7683
Pending: uint64(ptqstats.NumPending),
7784
}
7885
}
7986

87+
func (tq *WorkerTaskQueue) WithPeerTopics(p peer.ID, withPeerTopics func(*peertracker.PeerTrackerTopics)) {
88+
tq.lockTopics.Lock()
89+
peerTopics := tq.PeerTaskQueue.PeerTopics(p)
90+
withPeerTopics(peerTopics)
91+
tq.lockTopics.Unlock()
92+
}
93+
8094
// Startup runs the given number of task workers with the given executor
8195
func (tq *WorkerTaskQueue) Startup(workerCount uint64, executor Executor) {
8296
for i := uint64(0); i < workerCount; i++ {
@@ -100,16 +114,22 @@ func (tq *WorkerTaskQueue) WaitForNoActiveTasks() {
100114
func (tq *WorkerTaskQueue) worker(executor Executor) {
101115
targetWork := 1
102116
for {
117+
tq.lockTopics.Lock()
103118
pid, tasks, _ := tq.PeerTaskQueue.PopTasks(targetWork)
119+
tq.lockTopics.Unlock()
104120
for len(tasks) == 0 {
105121
select {
106122
case <-tq.ctx.Done():
107123
return
108124
case <-tq.workSignal:
125+
tq.lockTopics.Lock()
109126
pid, tasks, _ = tq.PeerTaskQueue.PopTasks(targetWork)
127+
tq.lockTopics.Unlock()
110128
case <-tq.ticker.C:
129+
tq.lockTopics.Lock()
111130
tq.PeerTaskQueue.ThawRound()
112131
pid, tasks, _ = tq.PeerTaskQueue.PopTasks(targetWork)
132+
tq.lockTopics.Unlock()
113133
}
114134
}
115135
for _, task := range tasks {

0 commit comments

Comments
 (0)