Skip to content

Commit 2d21b37

Browse files
authored
Merge pull request #16 from ipfs/feat/fifo-equal-priority
When priority is equal, use FIFO
2 parents e5f3db1 + 10c69a2 commit 2d21b37

File tree

5 files changed

+67
-3
lines changed

5 files changed

+67
-3
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/ipfs/go-peertaskqueue
33
go 1.16
44

55
require (
6+
github.com/benbjohnson/clock v1.1.0
67
github.com/ipfs/go-ipfs-pq v0.0.2
78
github.com/libp2p/go-libp2p-core v0.0.1
89
github.com/multiformats/go-multihash v0.0.5 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
2+
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
3+
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
24
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
35
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
46
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=

peertask/peertask.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ var FIFOCompare = func(a, b *QueueTask) bool {
1515
// PriorityCompare respects the target peer's task priority. For tasks involving
1616
// different peers, the oldest task is prioritized.
1717
var PriorityCompare = func(a, b *QueueTask) bool {
18-
if a.Target == b.Target {
18+
if a.Target == b.Target && a.Priority != b.Priority {
1919
return a.Priority > b.Priority
2020
}
2121
return FIFOCompare(a, b)

peertracker/peertracker.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package peertracker
22

33
import (
44
"sync"
5-
"time"
65

6+
"github.com/benbjohnson/clock"
77
pq "github.com/ipfs/go-ipfs-pq"
88
"github.com/ipfs/go-peertaskqueue/peertask"
99
peer "github.com/libp2p/go-libp2p-core/peer"
1010
)
1111

12+
var clockInstance = clock.New()
13+
1214
// TaskMerger is an interface that is used to merge new tasks into the active
1315
// and pending queues
1416
type TaskMerger interface {
@@ -144,7 +146,7 @@ func (p *PeerTracker) SetIndex(i int) {
144146

145147
// PushTasks adds a group of tasks onto a peer's queue
146148
func (p *PeerTracker) PushTasks(tasks ...peertask.Task) {
147-
now := time.Now()
149+
now := clockInstance.Now()
148150

149151
p.activelk.Lock()
150152
defer p.activelk.Unlock()

peertracker/peertracker_test.go

+59
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package peertracker
22

33
import (
44
"testing"
5+
"time"
56

7+
"github.com/benbjohnson/clock"
68
"github.com/ipfs/go-peertaskqueue/peertask"
79
"github.com/ipfs/go-peertaskqueue/testutil"
810
)
@@ -568,3 +570,60 @@ func TestRemoveActive(t *testing.T) {
568570
t.Fatal("Expected tasks in order")
569571
}
570572
}
573+
574+
func TestPushPopEqualTaskPriorities(t *testing.T) {
575+
partner := testutil.GeneratePeers(1)[0]
576+
clock := clock.NewMock()
577+
oldClock := clockInstance
578+
clockInstance = clock
579+
t.Cleanup(func() {
580+
clockInstance = oldClock
581+
})
582+
tracker := New(partner, &DefaultTaskMerger{}, 1)
583+
584+
tasks := []peertask.Task{
585+
{
586+
Topic: "1",
587+
Priority: 10,
588+
Work: 1,
589+
},
590+
{
591+
Topic: "2",
592+
Priority: 10,
593+
Work: 1,
594+
},
595+
{
596+
Topic: "3",
597+
Priority: 10,
598+
Work: 1,
599+
},
600+
}
601+
tracker.PushTasks(tasks[0])
602+
clock.Add(10 * time.Millisecond)
603+
tracker.PushTasks(tasks[1])
604+
clock.Add(10 * time.Millisecond)
605+
tracker.PushTasks(tasks[2])
606+
popped, _ := tracker.PopTasks(1)
607+
if len(popped) != 1 {
608+
t.Fatal("Expected 1 task")
609+
}
610+
if popped[0].Topic != "1" {
611+
t.Fatal("Expected first task")
612+
}
613+
tracker.TaskDone(popped[0])
614+
popped, _ = tracker.PopTasks(1)
615+
if len(popped) != 1 {
616+
t.Fatal("Expected 1 task")
617+
}
618+
if popped[0].Topic != "2" {
619+
t.Fatal("Expected second task")
620+
}
621+
tracker.TaskDone(popped[0])
622+
popped, _ = tracker.PopTasks(1)
623+
if len(popped) != 1 {
624+
t.Fatal("Expected 1 task")
625+
}
626+
if popped[0].Topic != "3" {
627+
t.Fatal("Expected third task")
628+
}
629+
}

0 commit comments

Comments
 (0)