Skip to content

feat: optimize checking if a new task is "better" #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions peertracker/peertracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type TaskMerger interface {
// HasNewInfo indicates whether the given task has more information than
// the existing group of tasks (which have the same Topic), and thus should
// be merged.
HasNewInfo(task peertask.Task, existing []peertask.Task) bool
HasNewInfo(task peertask.Task, existing []*peertask.Task) bool
// Merge copies relevant fields from a new task to an existing task.
Merge(task peertask.Task, existing *peertask.Task)
}
Expand All @@ -26,7 +26,7 @@ type TaskMerger interface {
// existing task (with the same Topic).
type DefaultTaskMerger struct{}

func (*DefaultTaskMerger) HasNewInfo(task peertask.Task, existing []peertask.Task) bool {
func (*DefaultTaskMerger) HasNewInfo(task peertask.Task, existing []*peertask.Task) bool {
return false
}

Expand All @@ -40,12 +40,12 @@ type PeerTracker struct {

// Tasks that are pending being made active
pendingTasks map[peertask.Topic]*peertask.QueueTask
// Tasks that have been made active
activeTasks map[*peertask.Task]struct{}

// activeWork must be locked around as it will be updated externally
activelk sync.Mutex
activeWork int
activelk sync.Mutex
// Tasks that have been made active. Unfortuantely, we can have multiple for the same topic
// as we might get a "supperior" request after starting to handle the initial one.
activeTasks map[peertask.Topic][]*peertask.Task
activeWork int

maxActiveWorkPerPeer int

Expand Down Expand Up @@ -79,7 +79,7 @@ func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int, opts .
target: target,
queueTaskComparator: peertask.PriorityCompare,
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[*peertask.Task]struct{}),
activeTasks: make(map[peertask.Topic][]*peertask.Task),
taskMerger: taskMerger,
maxActiveWorkPerPeer: maxActiveWorkPerPeer,
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func (p *PeerTracker) startTask(task *peertask.Task) {

// Add task to active queue
if _, ok := p.activeTasks[task]; !ok {
p.activeTasks[task] = struct{}{}
p.activeTasks[task.Topic] = append(p.activeTasks[task.Topic], task)
p.activeWork += task.Work
}
}
Expand All @@ -279,12 +279,33 @@ func (p *PeerTracker) TaskDone(task *peertask.Task) {
defer p.activelk.Unlock()

// Remove task from active queue
if _, ok := p.activeTasks[task]; ok {
delete(p.activeTasks, task)
p.activeWork -= task.Work
if p.activeWork < 0 {
panic("more tasks finished than started!")
activeTasks, ok := p.activeTasks[task.Topic]
if !ok {
return
}
// There will usually be 0 through 2 of these, so this should always be fast.
newTasks := activeTasks[:0]
for _, t := range activeTasks {
if task == t {
p.activeWork -= t.Work
continue
}
newTasks = append(newTasks, t)
}

if p.activeWork < 0 {
panic("more tasks finished than started!")
}

if len(newTasks) == 0 {
delete(p.activeTasks, task.Topic)
} else {
// Garbage collection.
for i := len(newTasks); i < len(activeTasks); i++ {
activeTasks[i] = nil
}

p.activeTasks[task.Topic] = newTasks
}
}

Expand Down Expand Up @@ -324,12 +345,7 @@ func (p *PeerTracker) IsFrozen() bool {
// Indicates whether the new task adds any more information over tasks that are
// already in the active task queue
func (p *PeerTracker) taskHasMoreInfoThanActiveTasks(task peertask.Task) bool {
var tasksWithTopic []peertask.Task
for at := range p.activeTasks {
if task.Topic == at.Topic {
tasksWithTopic = append(tasksWithTopic, *at)
}
}
tasksWithTopic := p.activeTasks[task.Topic]

// No tasks with that topic, so the new task adds information
if len(tasksWithTopic) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion peertracker/peertracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func TestTaskDone(t *testing.T) {

type permissiveTaskMerger struct{}

func (*permissiveTaskMerger) HasNewInfo(task peertask.Task, existing []peertask.Task) bool {
func (*permissiveTaskMerger) HasNewInfo(task peertask.Task, existing []*peertask.Task) bool {
return true
}
func (*permissiveTaskMerger) Merge(task peertask.Task, existing *peertask.Task) {
Expand Down