Skip to content

Expose task queue diagnostics #302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.3.0
github.com/ipfs/go-merkledag v0.5.1
github.com/ipfs/go-peertaskqueue v0.7.0
github.com/ipfs/go-peertaskqueue v0.7.1
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.12.3
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,9 @@ github.com/ipfs/go-merkledag v0.5.1/go.mod h1:cLMZXx8J08idkp5+id62iVftUQV+HlYJ3P
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5seGNsQ0=
github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-peertaskqueue v0.7.1 h1:7PLjon3RZwRQMgOTvYccZ+mjzkmds/7YzSWKFlBAypE=
github.com/ipfs/go-peertaskqueue v0.7.1/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo=
github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
Expand Down
13 changes: 13 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,19 @@ const (
Paused
)

func (rs RequestState) String() string {
switch rs {
case Queued:
return "queued"
case Running:
return "running"
case Paused:
return "paused"
default:
return "unrecognized request state"
}
}

// GraphExchange is a protocol that can exchange IPLD graphs based on a selector
type GraphExchange interface {
// Request initiates a new GraphSync request to the given peer using the given selector spec.
Expand Down
86 changes: 84 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package graphsync

import (
"context"
"fmt"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertracker"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -458,20 +461,99 @@ func (gs *GraphSync) Stats() graphsync.Stats {
}
}

// TaskQueueStatus describes the the set of requests for a given peer in a task queue
type TaskQueueStatus struct {
Active []graphsync.RequestID
Pending []graphsync.RequestID
}

func fromPeerTopics(pt *peertracker.PeerTrackerTopics, toRequestID func(peertask.Topic) graphsync.RequestID) TaskQueueStatus {
if pt == nil {
return TaskQueueStatus{}
}
active := make([]graphsync.RequestID, 0, len(pt.Active))
for _, topic := range pt.Active {
active = append(active, toRequestID(topic))
}
pending := make([]graphsync.RequestID, 0, len(pt.Pending))
for _, topic := range pt.Pending {
pending = append(pending, toRequestID(topic))
}
return TaskQueueStatus{
Active: active,
Pending: pending,
}
}

// PeerStats describes the state of graphsync for a given
type PeerStats struct {
// OutgoingRequests
OutgoingRequests graphsync.RequestStates
// OutgoingTaskQueue is set of requests for this peer in the outgoing task queue
OutgoingTaskQueue TaskQueueStatus
// IncomingRequests
IncomingRequests graphsync.RequestStates
// IncomingTaskQueue is set of requests for this peer in the incoming task queue
IncomingTaskQueue TaskQueueStatus
}

// PeerStats produces insight on the current state of a given peer
func (gs *GraphSync) PeerStats(p peer.ID) PeerStats {
return PeerStats{
OutgoingRequests: gs.requestManager.PeerStats(p),
IncomingRequests: gs.responseManager.PeerStats(p),
OutgoingRequests: gs.requestManager.PeerStats(p),
OutgoingTaskQueue: fromPeerTopics(gs.requestQueue.PeerTopics(p), requestmanager.RequestIDFromTaskTopic),
IncomingRequests: gs.responseManager.PeerStats(p),
IncomingTaskQueue: fromPeerTopics(gs.responseQueue.PeerTopics(p), responsemanager.RequestIDFromTaskTopic),
}
}

// QueueDiagnostics compares request states with the current state of the task queue to identify unexpected
// states or inconsistences between the tracked task queue and the tracked requests
func QueueDiagnostics(requestStates graphsync.RequestStates, taskQueueStatus TaskQueueStatus) map[graphsync.RequestID]string {
matchedActiveQueue := make(map[graphsync.RequestID]struct{}, len(requestStates))
matchedPendingQueue := make(map[graphsync.RequestID]struct{}, len(requestStates))
diagnostics := make(map[graphsync.RequestID]string)
for _, id := range taskQueueStatus.Active {
status, ok := requestStates[id]
if ok {
matchedActiveQueue[id] = struct{}{}
if status != graphsync.Running {
diagnostics[id] = fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was %s", id, status)
}
} else {
diagnostics[id] = fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", id)
}
}
for _, id := range taskQueueStatus.Pending {
status, ok := requestStates[id]
if ok {
matchedPendingQueue[id] = struct{}{}
if status != graphsync.Queued {
diagnostics[id] = fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was %s", id, status)
}
} else {
diagnostics[id] = fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", id)
}
}
for id, state := range requestStates {
if state == graphsync.Running {
if _, ok := matchedActiveQueue[id]; !ok {
// prefer message over being in incorrect state if present over being missing from queue
if _, ok := diagnostics[id]; !ok {
diagnostics[id] = fmt.Sprintf("request with id %d in running state is not in the active task queue", id)
}
}
}
if state == graphsync.Queued {
if _, ok := matchedPendingQueue[id]; !ok {
// prefer message over being in incorrect state if present over being missing from queue
if _, ok := diagnostics[id]; !ok {
diagnostics[id] = fmt.Sprintf("request with id %d in queued state is not in the pending task queue", id)
}
}
}
}
return diagnostics
}

type graphSyncReceiver GraphSync
Expand Down
141 changes: 140 additions & 1 deletion impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,10 +591,21 @@ func TestPauseResume(t *testing.T) {
requestorPeerStats := requestor.(*GraphSync).PeerStats(td.host2.ID())
require.Len(t, requestorPeerStats.OutgoingRequests, 1)
require.Len(t, requestorPeerStats.IncomingRequests, 0)

require.Len(t, requestorPeerStats.OutgoingTaskQueue.Active, 1)
require.Contains(t, requestorPeerStats.OutgoingRequests, requestorPeerStats.OutgoingTaskQueue.Active[0])
require.Len(t, requestorPeerStats.OutgoingTaskQueue.Pending, 0)
require.Len(t, requestorPeerStats.IncomingTaskQueue.Active, 0)
require.Len(t, requestorPeerStats.IncomingTaskQueue.Pending, 0)
require.Len(t, QueueDiagnostics(requestorPeerStats.OutgoingRequests, requestorPeerStats.OutgoingTaskQueue), 0)
responderPeerStats := responder.(*GraphSync).PeerStats(td.host1.ID())
require.Len(t, responderPeerStats.IncomingRequests, 1)
require.Len(t, responderPeerStats.OutgoingRequests, 0)
// no tasks as response is paused by responder
require.Len(t, responderPeerStats.IncomingTaskQueue.Active, 0)
require.Len(t, responderPeerStats.IncomingTaskQueue.Pending, 0)
require.Len(t, responderPeerStats.OutgoingTaskQueue.Active, 0)
require.Len(t, responderPeerStats.OutgoingTaskQueue.Pending, 0)
require.Len(t, QueueDiagnostics(responderPeerStats.IncomingRequests, responderPeerStats.IncomingTaskQueue), 0)

requestID := <-requestIDChan
err := responder.UnpauseResponse(td.host1.ID(), requestID)
Expand Down Expand Up @@ -1379,6 +1390,134 @@ func TestGraphsyncBlockListeners(t *testing.T) {
}, tracing.TracesToStrings())
}

func TestQueueDiagnostics(t *testing.T) {
requestIDs := make([]graphsync.RequestID, 0, 5)
for i := 0; i < 5; i++ {
requestIDs = append(requestIDs, graphsync.RequestID(rand.Int31()))
}
testCases := map[string]struct {
requestStates graphsync.RequestStates
queueStats TaskQueueStatus
expectedDiagnostics map[graphsync.RequestID]string
}{
"all requests and queue match": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: TaskQueueStatus{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]},
},
expectedDiagnostics: map[graphsync.RequestID]string{},
},
"active task with with incorrect state": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Queued,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: TaskQueueStatus{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1], requestIDs[4]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]},
},
expectedDiagnostics: map[graphsync.RequestID]string{
requestIDs[1]: fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was queued", requestIDs[1]),
requestIDs[4]: fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was paused", requestIDs[4]),
},
},
"active task with no state": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: TaskQueueStatus{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]},
},
expectedDiagnostics: map[graphsync.RequestID]string{
requestIDs[1]: fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", requestIDs[1]),
},
},
"pending task with with incorrect state": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Running,
requestIDs[4]: graphsync.Paused,
},
queueStats: TaskQueueStatus{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3], requestIDs[4]},
},
expectedDiagnostics: map[graphsync.RequestID]string{
requestIDs[3]: fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was running", requestIDs[3]),
requestIDs[4]: fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was paused", requestIDs[4]),
},
},
"pending task with no state": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: TaskQueueStatus{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]},
},
expectedDiagnostics: map[graphsync.RequestID]string{
requestIDs[3]: fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", requestIDs[3]),
},
},
"request state running with no active task": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: TaskQueueStatus{
Active: []graphsync.RequestID{requestIDs[0]},
Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]},
},
expectedDiagnostics: map[graphsync.RequestID]string{
requestIDs[1]: fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[1]),
},
},
"request state queued with no pending task": {
requestStates: graphsync.RequestStates{
requestIDs[0]: graphsync.Running,
requestIDs[1]: graphsync.Running,
requestIDs[2]: graphsync.Queued,
requestIDs[3]: graphsync.Queued,
requestIDs[4]: graphsync.Paused,
},
queueStats: TaskQueueStatus{
Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]},
Pending: []graphsync.RequestID{requestIDs[2]},
},
expectedDiagnostics: map[graphsync.RequestID]string{
requestIDs[3]: fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[3]),
},
},
}
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
require.Equal(t, data.expectedDiagnostics, QueueDiagnostics(data.requestStates, data.queueStats))
})
}
}

type gsTestData struct {
mn mocknet.Mocknet
ctx context.Context
Expand Down
6 changes: 6 additions & 0 deletions requestmanager/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/ipfs/go-graphsync"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-peertaskqueue/peertask"
)

func metadataForResponses(responses []gsmsg.GraphSyncResponse) map[graphsync.RequestID]metadata.Metadata {
Expand All @@ -23,3 +24,8 @@ func metadataForResponses(responses []gsmsg.GraphSyncResponse) map[graphsync.Req
}
return responseMetadata
}

// RequestIDFromTaskTopic extracts a request ID from a given peer task topic
func RequestIDFromTaskTopic(topic peertask.Topic) graphsync.RequestID {
return topic.(graphsync.RequestID)
}
10 changes: 6 additions & 4 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertracker"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
Expand Down Expand Up @@ -1286,9 +1287,10 @@ func (td *testData) assertHasNetworkErrors(err error) {

type nullTaskQueue struct{}

func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) {}
func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {}
func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {}
func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} }
func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) {}
func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {}
func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {}
func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} }
func (ntq nullTaskQueue) PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics { return nil }

var _ taskqueue.TaskQueue = nullTaskQueue{}
11 changes: 11 additions & 0 deletions responsemanager/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package responsemanager

import (
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-peertaskqueue/peertask"
)

// RequestIDFromTaskTopic extracts a request ID from a given peer task topic
func RequestIDFromTaskTopic(topic peertask.Topic) graphsync.RequestID {
return topic.(responseKey).requestID
}
Loading