From e97485a44f106d68239d9d6acdae19b0e408e90c Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 7 Dec 2021 19:58:14 -0800 Subject: [PATCH 1/3] feat(impl): expose task queue diagnostics --- go.mod | 2 +- go.sum | 3 +- graphsync.go | 13 +++ impl/graphsync.go | 86 ++++++++++++++- impl/graphsync_test.go | 141 +++++++++++++++++++++++- requestmanager/utils.go | 6 + responsemanager/responsemanager_test.go | 10 +- responsemanager/utils.go | 11 ++ taskqueue/taskqueue.go | 39 +++---- 9 files changed, 281 insertions(+), 30 deletions(-) create mode 100644 responsemanager/utils.go diff --git a/go.mod b/go.mod index a9588c2e..33447445 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/ipfs/go-ipld-format v0.2.0 github.com/ipfs/go-log/v2 v2.3.0 github.com/ipfs/go-merkledag v0.5.1 - github.com/ipfs/go-peertaskqueue v0.7.0 + github.com/ipfs/go-peertaskqueue v0.7.1 github.com/ipfs/go-unixfs v0.2.4 github.com/ipld/go-codec-dagpb v1.3.0 github.com/ipld/go-ipld-prime v0.12.3 diff --git a/go.sum b/go.sum index 85b9a42f..1bfc2705 100644 --- a/go.sum +++ b/go.sum @@ -444,8 +444,9 @@ github.com/ipfs/go-merkledag v0.5.1/go.mod h1:cLMZXx8J08idkp5+id62iVftUQV+HlYJ3P github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U= -github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5seGNsQ0= github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU= +github.com/ipfs/go-peertaskqueue v0.7.1 h1:7PLjon3RZwRQMgOTvYccZ+mjzkmds/7YzSWKFlBAypE= +github.com/ipfs/go-peertaskqueue v0.7.1/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU= github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo= github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw= github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E= diff --git a/graphsync.go b/graphsync.go index 20b69266..a23d9b17 100644 --- a/graphsync.go +++ b/graphsync.go @@ -347,6 +347,19 @@ const ( Paused ) +func (rs RequestState) String() string { + switch rs { + case Queued: + return "queued" + case Running: + return "running" + case Paused: + return "paused" + default: + return "unrecognized request state" + } +} + // GraphExchange is a protocol that can exchange IPLD graphs based on a selector type GraphExchange interface { // Request initiates a new GraphSync request to the given peer using the given selector spec. diff --git a/impl/graphsync.go b/impl/graphsync.go index f33e7246..ef7ffbed 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -2,10 +2,13 @@ package graphsync import ( "context" + "fmt" "time" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-peertaskqueue" + "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/ipfs/go-peertaskqueue/peertracker" ipld "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" "go.opentelemetry.io/otel" @@ -458,20 +461,99 @@ func (gs *GraphSync) Stats() graphsync.Stats { } } +// TaskQueueStatus describes the the set of requests for a given peer in a task queue +type TaskQueueStatus struct { + Active []graphsync.RequestID + Pending []graphsync.RequestID +} + +func fromPeerTopics(pt *peertracker.PeerTrackerTopics, toRequestID func(peertask.Topic) graphsync.RequestID) TaskQueueStatus { + if pt == nil { + return TaskQueueStatus{} + } + active := make([]graphsync.RequestID, 0, len(pt.Active)) + for _, topic := range pt.Active { + active = append(active, toRequestID(topic)) + } + pending := make([]graphsync.RequestID, 0, len(pt.Pending)) + for _, topic := range pt.Pending { + pending = append(pending, toRequestID(topic)) + } + return TaskQueueStatus{ + Active: active, + Pending: pending, + } +} + // PeerStats describes the state of graphsync for a given type PeerStats struct { // OutgoingRequests OutgoingRequests graphsync.RequestStates + // OutgoingTaskQueue is set of requests for this peer in the outgoing task queue + OutgoingTaskQueue TaskQueueStatus // IncomingRequests IncomingRequests graphsync.RequestStates + // IncomingTaskQueue is set of requests for this peer in the incoming task queue + IncomingTaskQueue TaskQueueStatus } // PeerStats produces insight on the current state of a given peer func (gs *GraphSync) PeerStats(p peer.ID) PeerStats { return PeerStats{ - OutgoingRequests: gs.requestManager.PeerStats(p), - IncomingRequests: gs.responseManager.PeerStats(p), + OutgoingRequests: gs.requestManager.PeerStats(p), + OutgoingTaskQueue: fromPeerTopics(gs.requestQueue.PeerTopics(p), requestmanager.RequestIDFromTaskTopic), + IncomingRequests: gs.responseManager.PeerStats(p), + IncomingTaskQueue: fromPeerTopics(gs.responseQueue.PeerTopics(p), responsemanager.RequestIDFromTaskTopic), + } +} + +// QueueDiagnostics compares request states with the current state of the task queue to identify unexpected +// states or inconsistences between the tracked task queue and the tracked requests +func QueueDiagnostics(requestStates graphsync.RequestStates, taskQueueStatus TaskQueueStatus) map[graphsync.RequestID]string { + matchedActiveQueue := make(map[graphsync.RequestID]struct{}, len(requestStates)) + matchedPendingQueue := make(map[graphsync.RequestID]struct{}, len(requestStates)) + diagnostics := make(map[graphsync.RequestID]string) + for _, id := range taskQueueStatus.Active { + status, ok := requestStates[id] + if ok { + matchedActiveQueue[id] = struct{}{} + if status != graphsync.Running { + diagnostics[id] = fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was %s", id, status) + } + } else { + diagnostics[id] = fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", id) + } + } + for _, id := range taskQueueStatus.Pending { + status, ok := requestStates[id] + if ok { + matchedPendingQueue[id] = struct{}{} + if status != graphsync.Queued { + diagnostics[id] = fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was %s", id, status) + } + } else { + diagnostics[id] = fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", id) + } + } + for id, state := range requestStates { + if state == graphsync.Running { + if _, ok := matchedActiveQueue[id]; !ok { + // prefer message over being in incorrect state if present over being missing from queue + if _, ok := diagnostics[id]; !ok { + diagnostics[id] = fmt.Sprintf("request with id %d in running state is not in the active task queue", id) + } + } + } + if state == graphsync.Queued { + if _, ok := matchedPendingQueue[id]; !ok { + // prefer message over being in incorrect state if present over being missing from queue + if _, ok := diagnostics[id]; !ok { + diagnostics[id] = fmt.Sprintf("request with id %d in queued state is not in the pending task queue", id) + } + } + } } + return diagnostics } type graphSyncReceiver GraphSync diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 3f3470d6..13772b76 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -591,10 +591,21 @@ func TestPauseResume(t *testing.T) { requestorPeerStats := requestor.(*GraphSync).PeerStats(td.host2.ID()) require.Len(t, requestorPeerStats.OutgoingRequests, 1) require.Len(t, requestorPeerStats.IncomingRequests, 0) - + require.Len(t, requestorPeerStats.OutgoingTaskQueue.Active, 1) + require.Contains(t, requestorPeerStats.OutgoingRequests, requestorPeerStats.OutgoingTaskQueue.Active[0]) + require.Len(t, requestorPeerStats.OutgoingTaskQueue.Pending, 0) + require.Len(t, requestorPeerStats.IncomingTaskQueue.Active, 0) + require.Len(t, requestorPeerStats.IncomingTaskQueue.Pending, 0) + require.Len(t, QueueDiagnostics(requestorPeerStats.OutgoingRequests, requestorPeerStats.OutgoingTaskQueue), 0) responderPeerStats := responder.(*GraphSync).PeerStats(td.host1.ID()) require.Len(t, responderPeerStats.IncomingRequests, 1) require.Len(t, responderPeerStats.OutgoingRequests, 0) + // no tasks as response is paused by responder + require.Len(t, responderPeerStats.IncomingTaskQueue.Active, 0) + require.Len(t, responderPeerStats.IncomingTaskQueue.Pending, 0) + require.Len(t, responderPeerStats.OutgoingTaskQueue.Active, 0) + require.Len(t, responderPeerStats.OutgoingTaskQueue.Pending, 0) + require.Len(t, QueueDiagnostics(responderPeerStats.IncomingRequests, responderPeerStats.IncomingTaskQueue), 0) requestID := <-requestIDChan err := responder.UnpauseResponse(td.host1.ID(), requestID) @@ -1379,6 +1390,134 @@ func TestGraphsyncBlockListeners(t *testing.T) { }, tracing.TracesToStrings()) } +func TestQueueDiagnostics(t *testing.T) { + requestIDs := make([]graphsync.RequestID, 0, 5) + for i := 0; i < 5; i++ { + requestIDs = append(requestIDs, graphsync.RequestID(rand.Int31())) + } + testCases := map[string]struct { + requestStates graphsync.RequestStates + queueStats TaskQueueStatus + expectedDiagnostics map[graphsync.RequestID]string + }{ + "all requests and queue match": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: TaskQueueStatus{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{}, + }, + "active task with with incorrect state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Queued, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: TaskQueueStatus{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1], requestIDs[4]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[1]: fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was queued", requestIDs[1]), + requestIDs[4]: fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was paused", requestIDs[4]), + }, + }, + "active task with no state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: TaskQueueStatus{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[1]: fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", requestIDs[1]), + }, + }, + "pending task with with incorrect state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Running, + requestIDs[4]: graphsync.Paused, + }, + queueStats: TaskQueueStatus{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3], requestIDs[4]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[3]: fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was running", requestIDs[3]), + requestIDs[4]: fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was paused", requestIDs[4]), + }, + }, + "pending task with no state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: TaskQueueStatus{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[3]: fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", requestIDs[3]), + }, + }, + "request state running with no active task": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: TaskQueueStatus{ + Active: []graphsync.RequestID{requestIDs[0]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[1]: fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[1]), + }, + }, + "request state queued with no pending task": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: TaskQueueStatus{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[3]: fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[3]), + }, + }, + } + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + require.Equal(t, data.expectedDiagnostics, QueueDiagnostics(data.requestStates, data.queueStats)) + }) + } +} + type gsTestData struct { mn mocknet.Mocknet ctx context.Context diff --git a/requestmanager/utils.go b/requestmanager/utils.go index a4f0c758..4f875deb 100644 --- a/requestmanager/utils.go +++ b/requestmanager/utils.go @@ -4,6 +4,7 @@ import ( "github.com/ipfs/go-graphsync" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/metadata" + "github.com/ipfs/go-peertaskqueue/peertask" ) func metadataForResponses(responses []gsmsg.GraphSyncResponse) map[graphsync.RequestID]metadata.Metadata { @@ -23,3 +24,8 @@ func metadataForResponses(responses []gsmsg.GraphSyncResponse) map[graphsync.Req } return responseMetadata } + +// RequestIDFromTaskTopic extracts a request ID from a given peer task topic +func RequestIDFromTaskTopic(topic peertask.Topic) graphsync.RequestID { + return topic.(graphsync.RequestID) +} diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 204f786d..4adb68d7 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -11,6 +11,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/ipfs/go-peertaskqueue/peertracker" ipld "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/node/basicnode" @@ -1286,9 +1287,10 @@ func (td *testData) assertHasNetworkErrors(err error) { type nullTaskQueue struct{} -func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) {} -func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {} -func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {} -func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} } +func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) {} +func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {} +func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {} +func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} } +func (ntq nullTaskQueue) PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics { return nil } var _ taskqueue.TaskQueue = nullTaskQueue{} diff --git a/responsemanager/utils.go b/responsemanager/utils.go new file mode 100644 index 00000000..fcffb258 --- /dev/null +++ b/responsemanager/utils.go @@ -0,0 +1,11 @@ +package responsemanager + +import ( + "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-peertaskqueue/peertask" +) + +// RequestIDFromTaskTopic extracts a request ID from a given peer task topic +func RequestIDFromTaskTopic(topic peertask.Topic) graphsync.RequestID { + return topic.(responseKey).requestID +} diff --git a/taskqueue/taskqueue.go b/taskqueue/taskqueue.go index a1414735..7a667a29 100644 --- a/taskqueue/taskqueue.go +++ b/taskqueue/taskqueue.go @@ -7,6 +7,7 @@ import ( "github.com/ipfs/go-peertaskqueue" "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/ipfs/go-peertaskqueue/peertracker" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/ipfs/go-graphsync" @@ -24,18 +25,19 @@ type TaskQueue interface { TaskDone(p peer.ID, task *peertask.Task) Remove(t peertask.Topic, p peer.ID) Stats() graphsync.RequestStats + PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics } -// TaskQueue is a wrapper around peertaskqueue.PeerTaskQueue that manages running workers +// WorkerTaskQueue is a wrapper around peertaskqueue.PeerTaskQueue that manages running workers // that pop tasks and execute them type WorkerTaskQueue struct { - ctx context.Context - cancelFn func() - peerTaskQueue *peertaskqueue.PeerTaskQueue - workSignal chan struct{} - noTaskCond *sync.Cond - ticker *time.Ticker - activeTasks int32 + *peertaskqueue.PeerTaskQueue + ctx context.Context + cancelFn func() + workSignal chan struct{} + noTaskCond *sync.Cond + ticker *time.Ticker + activeTasks int32 } // NewTaskQueue initializes a new queue @@ -44,7 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue { return &WorkerTaskQueue{ ctx: ctx, cancelFn: cancelFn, - peerTaskQueue: peertaskqueue.New(), + PeerTaskQueue: peertaskqueue.New(), workSignal: make(chan struct{}, 1), noTaskCond: sync.NewCond(&sync.Mutex{}), ticker: time.NewTicker(thawSpeed), @@ -53,7 +55,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue { // PushTask pushes a new task on to the queue func (tq *WorkerTaskQueue) PushTask(p peer.ID, task peertask.Task) { - tq.peerTaskQueue.PushTasks(p, task) + tq.PeerTaskQueue.PushTasks(p, task) select { case tq.workSignal <- struct{}{}: default: @@ -62,12 +64,12 @@ func (tq *WorkerTaskQueue) PushTask(p peer.ID, task peertask.Task) { // TaskDone marks a task as completed so further tasks can be executed func (tq *WorkerTaskQueue) TaskDone(p peer.ID, task *peertask.Task) { - tq.peerTaskQueue.TasksDone(p, task) + tq.PeerTaskQueue.TasksDone(p, task) } // Stats returns statistics about a task queue func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats { - ptqstats := tq.peerTaskQueue.Stats() + ptqstats := tq.PeerTaskQueue.Stats() return graphsync.RequestStats{ TotalPeers: uint64(ptqstats.NumPeers), Active: uint64(ptqstats.NumActive), @@ -75,11 +77,6 @@ func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats { } } -// Remove removes a task from the execution queue -func (tq *WorkerTaskQueue) Remove(topic peertask.Topic, p peer.ID) { - tq.peerTaskQueue.Remove(topic, p) -} - // Startup runs the given number of task workers with the given executor func (tq *WorkerTaskQueue) Startup(workerCount uint64, executor Executor) { for i := uint64(0); i < workerCount; i++ { @@ -103,16 +100,16 @@ func (tq *WorkerTaskQueue) WaitForNoActiveTasks() { func (tq *WorkerTaskQueue) worker(executor Executor) { targetWork := 1 for { - pid, tasks, _ := tq.peerTaskQueue.PopTasks(targetWork) + pid, tasks, _ := tq.PeerTaskQueue.PopTasks(targetWork) for len(tasks) == 0 { select { case <-tq.ctx.Done(): return case <-tq.workSignal: - pid, tasks, _ = tq.peerTaskQueue.PopTasks(targetWork) + pid, tasks, _ = tq.PeerTaskQueue.PopTasks(targetWork) case <-tq.ticker.C: - tq.peerTaskQueue.ThawRound() - pid, tasks, _ = tq.peerTaskQueue.PopTasks(targetWork) + tq.PeerTaskQueue.ThawRound() + pid, tasks, _ = tq.PeerTaskQueue.PopTasks(targetWork) } } for _, task := range tasks { From 49b47dde25c97c957a629982da7bc9c0199cafc1 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 8 Dec 2021 15:58:10 -0800 Subject: [PATCH 2/3] refactor(peerstate): put peerstate in its own module --- impl/graphsync.go | 103 ++------------- impl/graphsync_test.go | 162 +++--------------------- peerstate/peerstate.go | 69 ++++++++++ peerstate/peerstate_test.go | 139 ++++++++++++++++++++ requestmanager/client.go | 15 ++- requestmanager/messages.go | 7 +- requestmanager/requestmanager_test.go | 12 +- requestmanager/server.go | 29 ++++- responsemanager/client.go | 15 ++- responsemanager/messages.go | 11 +- responsemanager/responsemanager_test.go | 43 +++++-- responsemanager/server.go | 29 ++++- responsemanager/utils.go | 11 -- 13 files changed, 349 insertions(+), 296 deletions(-) create mode 100644 peerstate/peerstate.go create mode 100644 peerstate/peerstate_test.go delete mode 100644 responsemanager/utils.go diff --git a/impl/graphsync.go b/impl/graphsync.go index ef7ffbed..d32d9521 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -2,13 +2,10 @@ package graphsync import ( "context" - "fmt" "time" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-peertaskqueue" - "github.com/ipfs/go-peertaskqueue/peertask" - "github.com/ipfs/go-peertaskqueue/peertracker" ipld "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" "go.opentelemetry.io/otel" @@ -22,6 +19,7 @@ import ( "github.com/ipfs/go-graphsync/messagequeue" gsnet "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/peermanager" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/requestmanager" "github.com/ipfs/go-graphsync/requestmanager/asyncloader" "github.com/ipfs/go-graphsync/requestmanager/executor" @@ -461,99 +459,18 @@ func (gs *GraphSync) Stats() graphsync.Stats { } } -// TaskQueueStatus describes the the set of requests for a given peer in a task queue -type TaskQueueStatus struct { - Active []graphsync.RequestID - Pending []graphsync.RequestID +// PeerState describes the state of graphsync for a given peer +type PeerState struct { + OutgoingState peerstate.PeerState + IncomingState peerstate.PeerState } -func fromPeerTopics(pt *peertracker.PeerTrackerTopics, toRequestID func(peertask.Topic) graphsync.RequestID) TaskQueueStatus { - if pt == nil { - return TaskQueueStatus{} +// PeerState produces insight on the current state of a given peer +func (gs *GraphSync) PeerState(p peer.ID) PeerState { + return PeerState{ + OutgoingState: gs.requestManager.PeerState(p), + IncomingState: gs.responseManager.PeerState(p), } - active := make([]graphsync.RequestID, 0, len(pt.Active)) - for _, topic := range pt.Active { - active = append(active, toRequestID(topic)) - } - pending := make([]graphsync.RequestID, 0, len(pt.Pending)) - for _, topic := range pt.Pending { - pending = append(pending, toRequestID(topic)) - } - return TaskQueueStatus{ - Active: active, - Pending: pending, - } -} - -// PeerStats describes the state of graphsync for a given -type PeerStats struct { - // OutgoingRequests - OutgoingRequests graphsync.RequestStates - // OutgoingTaskQueue is set of requests for this peer in the outgoing task queue - OutgoingTaskQueue TaskQueueStatus - // IncomingRequests - IncomingRequests graphsync.RequestStates - // IncomingTaskQueue is set of requests for this peer in the incoming task queue - IncomingTaskQueue TaskQueueStatus -} - -// PeerStats produces insight on the current state of a given peer -func (gs *GraphSync) PeerStats(p peer.ID) PeerStats { - return PeerStats{ - OutgoingRequests: gs.requestManager.PeerStats(p), - OutgoingTaskQueue: fromPeerTopics(gs.requestQueue.PeerTopics(p), requestmanager.RequestIDFromTaskTopic), - IncomingRequests: gs.responseManager.PeerStats(p), - IncomingTaskQueue: fromPeerTopics(gs.responseQueue.PeerTopics(p), responsemanager.RequestIDFromTaskTopic), - } -} - -// QueueDiagnostics compares request states with the current state of the task queue to identify unexpected -// states or inconsistences between the tracked task queue and the tracked requests -func QueueDiagnostics(requestStates graphsync.RequestStates, taskQueueStatus TaskQueueStatus) map[graphsync.RequestID]string { - matchedActiveQueue := make(map[graphsync.RequestID]struct{}, len(requestStates)) - matchedPendingQueue := make(map[graphsync.RequestID]struct{}, len(requestStates)) - diagnostics := make(map[graphsync.RequestID]string) - for _, id := range taskQueueStatus.Active { - status, ok := requestStates[id] - if ok { - matchedActiveQueue[id] = struct{}{} - if status != graphsync.Running { - diagnostics[id] = fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was %s", id, status) - } - } else { - diagnostics[id] = fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", id) - } - } - for _, id := range taskQueueStatus.Pending { - status, ok := requestStates[id] - if ok { - matchedPendingQueue[id] = struct{}{} - if status != graphsync.Queued { - diagnostics[id] = fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was %s", id, status) - } - } else { - diagnostics[id] = fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", id) - } - } - for id, state := range requestStates { - if state == graphsync.Running { - if _, ok := matchedActiveQueue[id]; !ok { - // prefer message over being in incorrect state if present over being missing from queue - if _, ok := diagnostics[id]; !ok { - diagnostics[id] = fmt.Sprintf("request with id %d in running state is not in the active task queue", id) - } - } - } - if state == graphsync.Queued { - if _, ok := matchedPendingQueue[id]; !ok { - // prefer message over being in incorrect state if present over being missing from queue - if _, ok := diagnostics[id]; !ok { - diagnostics[id] = fmt.Sprintf("request with id %d in queued state is not in the pending task queue", id) - } - } - } - } - return diagnostics } type graphSyncReceiver GraphSync diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 13772b76..af03aa51 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -588,24 +588,24 @@ func TestPauseResume(t *testing.T) { timer := time.NewTimer(100 * time.Millisecond) testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan) - requestorPeerStats := requestor.(*GraphSync).PeerStats(td.host2.ID()) - require.Len(t, requestorPeerStats.OutgoingRequests, 1) - require.Len(t, requestorPeerStats.IncomingRequests, 0) - require.Len(t, requestorPeerStats.OutgoingTaskQueue.Active, 1) - require.Contains(t, requestorPeerStats.OutgoingRequests, requestorPeerStats.OutgoingTaskQueue.Active[0]) - require.Len(t, requestorPeerStats.OutgoingTaskQueue.Pending, 0) - require.Len(t, requestorPeerStats.IncomingTaskQueue.Active, 0) - require.Len(t, requestorPeerStats.IncomingTaskQueue.Pending, 0) - require.Len(t, QueueDiagnostics(requestorPeerStats.OutgoingRequests, requestorPeerStats.OutgoingTaskQueue), 0) - responderPeerStats := responder.(*GraphSync).PeerStats(td.host1.ID()) - require.Len(t, responderPeerStats.IncomingRequests, 1) - require.Len(t, responderPeerStats.OutgoingRequests, 0) + requestorPeerState := requestor.(*GraphSync).PeerState(td.host2.ID()) + require.Len(t, requestorPeerState.OutgoingState.RequestStates, 1) + require.Len(t, requestorPeerState.IncomingState.RequestStates, 0) + require.Len(t, requestorPeerState.OutgoingState.Active, 1) + require.Contains(t, requestorPeerState.OutgoingState.RequestStates, requestorPeerState.OutgoingState.Active[0]) + require.Len(t, requestorPeerState.OutgoingState.Pending, 0) + require.Len(t, requestorPeerState.IncomingState.Active, 0) + require.Len(t, requestorPeerState.IncomingState.Pending, 0) + require.Len(t, requestorPeerState.OutgoingState.Diagnostics(), 0) + responderPeerState := responder.(*GraphSync).PeerState(td.host1.ID()) + require.Len(t, responderPeerState.IncomingState.RequestStates, 1) + require.Len(t, responderPeerState.OutgoingState.RequestStates, 0) // no tasks as response is paused by responder - require.Len(t, responderPeerStats.IncomingTaskQueue.Active, 0) - require.Len(t, responderPeerStats.IncomingTaskQueue.Pending, 0) - require.Len(t, responderPeerStats.OutgoingTaskQueue.Active, 0) - require.Len(t, responderPeerStats.OutgoingTaskQueue.Pending, 0) - require.Len(t, QueueDiagnostics(responderPeerStats.IncomingRequests, responderPeerStats.IncomingTaskQueue), 0) + require.Len(t, responderPeerState.IncomingState.Active, 0) + require.Len(t, responderPeerState.IncomingState.Pending, 0) + require.Len(t, responderPeerState.OutgoingState.Active, 0) + require.Len(t, responderPeerState.OutgoingState.Pending, 0) + require.Len(t, responderPeerState.IncomingState.Diagnostics(), 0) requestID := <-requestIDChan err := responder.UnpauseResponse(td.host1.ID(), requestID) @@ -1390,134 +1390,6 @@ func TestGraphsyncBlockListeners(t *testing.T) { }, tracing.TracesToStrings()) } -func TestQueueDiagnostics(t *testing.T) { - requestIDs := make([]graphsync.RequestID, 0, 5) - for i := 0; i < 5; i++ { - requestIDs = append(requestIDs, graphsync.RequestID(rand.Int31())) - } - testCases := map[string]struct { - requestStates graphsync.RequestStates - queueStats TaskQueueStatus - expectedDiagnostics map[graphsync.RequestID]string - }{ - "all requests and queue match": { - requestStates: graphsync.RequestStates{ - requestIDs[0]: graphsync.Running, - requestIDs[1]: graphsync.Running, - requestIDs[2]: graphsync.Queued, - requestIDs[3]: graphsync.Queued, - requestIDs[4]: graphsync.Paused, - }, - queueStats: TaskQueueStatus{ - Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, - Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, - }, - expectedDiagnostics: map[graphsync.RequestID]string{}, - }, - "active task with with incorrect state": { - requestStates: graphsync.RequestStates{ - requestIDs[0]: graphsync.Running, - requestIDs[1]: graphsync.Queued, - requestIDs[2]: graphsync.Queued, - requestIDs[3]: graphsync.Queued, - requestIDs[4]: graphsync.Paused, - }, - queueStats: TaskQueueStatus{ - Active: []graphsync.RequestID{requestIDs[0], requestIDs[1], requestIDs[4]}, - Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, - }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[1]: fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was queued", requestIDs[1]), - requestIDs[4]: fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was paused", requestIDs[4]), - }, - }, - "active task with no state": { - requestStates: graphsync.RequestStates{ - requestIDs[0]: graphsync.Running, - requestIDs[2]: graphsync.Queued, - requestIDs[3]: graphsync.Queued, - requestIDs[4]: graphsync.Paused, - }, - queueStats: TaskQueueStatus{ - Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, - Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, - }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[1]: fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", requestIDs[1]), - }, - }, - "pending task with with incorrect state": { - requestStates: graphsync.RequestStates{ - requestIDs[0]: graphsync.Running, - requestIDs[1]: graphsync.Running, - requestIDs[2]: graphsync.Queued, - requestIDs[3]: graphsync.Running, - requestIDs[4]: graphsync.Paused, - }, - queueStats: TaskQueueStatus{ - Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, - Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3], requestIDs[4]}, - }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[3]: fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was running", requestIDs[3]), - requestIDs[4]: fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was paused", requestIDs[4]), - }, - }, - "pending task with no state": { - requestStates: graphsync.RequestStates{ - requestIDs[0]: graphsync.Running, - requestIDs[1]: graphsync.Running, - requestIDs[2]: graphsync.Queued, - requestIDs[4]: graphsync.Paused, - }, - queueStats: TaskQueueStatus{ - Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, - Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, - }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[3]: fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", requestIDs[3]), - }, - }, - "request state running with no active task": { - requestStates: graphsync.RequestStates{ - requestIDs[0]: graphsync.Running, - requestIDs[1]: graphsync.Running, - requestIDs[2]: graphsync.Queued, - requestIDs[3]: graphsync.Queued, - requestIDs[4]: graphsync.Paused, - }, - queueStats: TaskQueueStatus{ - Active: []graphsync.RequestID{requestIDs[0]}, - Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, - }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[1]: fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[1]), - }, - }, - "request state queued with no pending task": { - requestStates: graphsync.RequestStates{ - requestIDs[0]: graphsync.Running, - requestIDs[1]: graphsync.Running, - requestIDs[2]: graphsync.Queued, - requestIDs[3]: graphsync.Queued, - requestIDs[4]: graphsync.Paused, - }, - queueStats: TaskQueueStatus{ - Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, - Pending: []graphsync.RequestID{requestIDs[2]}, - }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[3]: fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[3]), - }, - }, - } - for testCase, data := range testCases { - t.Run(testCase, func(t *testing.T) { - require.Equal(t, data.expectedDiagnostics, QueueDiagnostics(data.requestStates, data.queueStats)) - }) - } -} - type gsTestData struct { mn mocknet.Mocknet ctx context.Context diff --git a/peerstate/peerstate.go b/peerstate/peerstate.go new file mode 100644 index 00000000..e0a3ebfc --- /dev/null +++ b/peerstate/peerstate.go @@ -0,0 +1,69 @@ +package peerstate + +import ( + "fmt" + + "github.com/ipfs/go-graphsync" +) + +// TaskQueueState describes the the set of requests for a given peer in a task queue +type TaskQueueState struct { + Active []graphsync.RequestID + Pending []graphsync.RequestID +} + +// PeerState tracks the over all state of a given peer for either +// incoming or outgoing requests +type PeerState struct { + graphsync.RequestStates + TaskQueueState +} + +// Diagnostics compares request states with the current state of the task queue to identify unexpected +// states or inconsistences between the tracked task queue and the tracked requests +func (ps PeerState) Diagnostics() map[graphsync.RequestID]string { + matchedActiveQueue := make(map[graphsync.RequestID]struct{}, len(ps.RequestStates)) + matchedPendingQueue := make(map[graphsync.RequestID]struct{}, len(ps.RequestStates)) + diagnostics := make(map[graphsync.RequestID]string) + for _, id := range ps.TaskQueueState.Active { + status, ok := ps.RequestStates[id] + if ok { + matchedActiveQueue[id] = struct{}{} + if status != graphsync.Running { + diagnostics[id] = fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was %s", id, status) + } + } else { + diagnostics[id] = fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", id) + } + } + for _, id := range ps.TaskQueueState.Pending { + status, ok := ps.RequestStates[id] + if ok { + matchedPendingQueue[id] = struct{}{} + if status != graphsync.Queued { + diagnostics[id] = fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was %s", id, status) + } + } else { + diagnostics[id] = fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", id) + } + } + for id, state := range ps.RequestStates { + if state == graphsync.Running { + if _, ok := matchedActiveQueue[id]; !ok { + // prefer message over being in incorrect state if present over being missing from queue + if _, ok := diagnostics[id]; !ok { + diagnostics[id] = fmt.Sprintf("request with id %d in running state is not in the active task queue", id) + } + } + } + if state == graphsync.Queued { + if _, ok := matchedPendingQueue[id]; !ok { + // prefer message over being in incorrect state if present over being missing from queue + if _, ok := diagnostics[id]; !ok { + diagnostics[id] = fmt.Sprintf("request with id %d in queued state is not in the pending task queue", id) + } + } + } + } + return diagnostics +} diff --git a/peerstate/peerstate_test.go b/peerstate/peerstate_test.go new file mode 100644 index 00000000..b5887f4e --- /dev/null +++ b/peerstate/peerstate_test.go @@ -0,0 +1,139 @@ +package peerstate_test + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/peerstate" + "github.com/stretchr/testify/require" +) + +func TestDiagnostics(t *testing.T) { + requestIDs := make([]graphsync.RequestID, 0, 5) + for i := 0; i < 5; i++ { + requestIDs = append(requestIDs, graphsync.RequestID(rand.Int31())) + } + testCases := map[string]struct { + requestStates graphsync.RequestStates + queueStats peerstate.TaskQueueState + expectedDiagnostics map[graphsync.RequestID]string + }{ + "all requests and queue match": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{}, + }, + "active task with with incorrect state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Queued, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1], requestIDs[4]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[1]: fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was queued", requestIDs[1]), + requestIDs[4]: fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was paused", requestIDs[4]), + }, + }, + "active task with no state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[1]: fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", requestIDs[1]), + }, + }, + "pending task with with incorrect state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Running, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3], requestIDs[4]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[3]: fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was running", requestIDs[3]), + requestIDs[4]: fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was paused", requestIDs[4]), + }, + }, + "pending task with no state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[3]: fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", requestIDs[3]), + }, + }, + "request state running with no active task": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[1]: fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[1]), + }, + }, + "request state queued with no pending task": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2]}, + }, + expectedDiagnostics: map[graphsync.RequestID]string{ + requestIDs[3]: fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[3]), + }, + }, + } + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + require.Equal(t, data.expectedDiagnostics, peerstate.PeerState{data.requestStates, data.queueStats}.Diagnostics()) + }) + } +} diff --git a/requestmanager/client.go b/requestmanager/client.go index 578a6961..95081b3d 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -27,6 +27,7 @@ import ( "github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/requestmanager/executor" "github.com/ipfs/go-graphsync/requestmanager/hooks" "github.com/ipfs/go-graphsync/requestmanager/types" @@ -332,15 +333,15 @@ func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err } } -// PeerStats gets stats on all outgoing requests for a given peer -func (rm *RequestManager) PeerStats(p peer.ID) graphsync.RequestStates { - response := make(chan graphsync.RequestStates) - rm.send(&peerStatsMessage{p, response}, nil) +// PeerState gets stats on all outgoing requests for a given peer +func (rm *RequestManager) PeerState(p peer.ID) peerstate.PeerState { + response := make(chan peerstate.PeerState) + rm.send(&peerStateMessage{p, response}, nil) select { case <-rm.ctx.Done(): - return nil - case peerStats := <-response: - return peerStats + return peerstate.PeerState{} + case peerState := <-response: + return peerState } } diff --git a/requestmanager/messages.go b/requestmanager/messages.go index ff99833d..dd5acf3c 100644 --- a/requestmanager/messages.go +++ b/requestmanager/messages.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-graphsync" gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/requestmanager/executor" ) @@ -109,12 +110,12 @@ func (nrm *newRequestMessage) handle(rm *RequestManager) { } } -type peerStatsMessage struct { +type peerStateMessage struct { p peer.ID - peerStatsChan chan<- graphsync.RequestStates + peerStatsChan chan<- peerstate.PeerState } -func (psm *peerStatsMessage) handle(rm *RequestManager) { +func (psm *peerStateMessage) handle(rm *RequestManager) { peerStats := rm.peerStats(psm.p) select { case psm.peerStatsChan <- peerStats: diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index f5f1bc15..c62d9702 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -994,10 +994,14 @@ func TestStats(t *testing.T) { requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 3) - states := td.requestManager.PeerStats(peers[0]) - require.Len(t, states, 2) - require.Equal(t, states[requestRecords[0].gsr.ID()], graphsync.Running) - require.Equal(t, states[requestRecords[1].gsr.ID()], graphsync.Running) + peerState := td.requestManager.PeerState(peers[0]) + require.Len(t, peerState.RequestStates, 2) + require.Equal(t, peerState.RequestStates[requestRecords[0].gsr.ID()], graphsync.Running) + require.Equal(t, peerState.RequestStates[requestRecords[1].gsr.ID()], graphsync.Running) + require.Len(t, peerState.Active, 2) + require.Contains(t, peerState.Active, requestRecords[0].gsr.ID()) + require.Contains(t, peerState.Active, requestRecords[1].gsr.ID()) + require.Len(t, peerState.Pending, 0) } type requestRecord struct { diff --git a/requestmanager/server.go b/requestmanager/server.go index 328d7751..da7fd0f0 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -10,6 +10,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/ipfs/go-peertaskqueue/peertracker" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal" @@ -25,6 +26,7 @@ import ( "github.com/ipfs/go-graphsync/dedupkey" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/requestmanager/executor" "github.com/ipfs/go-graphsync/requestmanager/hooks" ) @@ -392,12 +394,31 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error { return nil } -func (rm *RequestManager) peerStats(p peer.ID) graphsync.RequestStates { - peerStats := make(graphsync.RequestStates) +func (rm *RequestManager) peerStats(p peer.ID) peerstate.PeerState { + requestStates := make(graphsync.RequestStates) for id, ipr := range rm.inProgressRequestStatuses { if ipr.p == p { - peerStats[id] = graphsync.RequestState(ipr.state) + requestStates[id] = graphsync.RequestState(ipr.state) } } - return peerStats + peerTopics := rm.requestQueue.PeerTopics(p) + return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)} +} + +func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState { + if pt == nil { + return peerstate.TaskQueueState{} + } + active := make([]graphsync.RequestID, 0, len(pt.Active)) + for _, topic := range pt.Active { + active = append(active, topic.(graphsync.RequestID)) + } + pending := make([]graphsync.RequestID, 0, len(pt.Pending)) + for _, topic := range pt.Pending { + pending = append(pending, topic.(graphsync.RequestID)) + } + return peerstate.TaskQueueState{ + Active: active, + Pending: pending, + } } diff --git a/responsemanager/client.go b/responsemanager/client.go index 4e847a9e..de284e3a 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -15,6 +15,7 @@ import ( gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/queryexecutor" "github.com/ipfs/go-graphsync/responsemanager/responseassembler" @@ -231,15 +232,15 @@ func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync. rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, make(chan error, 1)}, nil) } -// PeerStats gets stats on all outgoing requests for a given peer -func (rm *ResponseManager) PeerStats(p peer.ID) graphsync.RequestStates { - response := make(chan graphsync.RequestStates) - rm.send(&peerStatsMessage{p, response}, nil) +// PeerState gets current state of the outgoing responses for a given peer +func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState { + response := make(chan peerstate.PeerState) + rm.send(&peerStateMessage{p, response}, nil) select { case <-rm.ctx.Done(): - return nil - case peerStats := <-response: - return peerStats + return peerstate.PeerState{} + case peerState := <-response: + return peerState } } diff --git a/responsemanager/messages.go b/responsemanager/messages.go index 26e319a6..0a05f08e 100644 --- a/responsemanager/messages.go +++ b/responsemanager/messages.go @@ -6,6 +6,7 @@ import ( "github.com/ipfs/go-graphsync" gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/responsemanager/queryexecutor" ) @@ -114,15 +115,15 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { rm.processRequests(prm.p, prm.requests) } -type peerStatsMessage struct { +type peerStateMessage struct { p peer.ID - peerStatsChan chan<- graphsync.RequestStates + peerStatsChan chan<- peerstate.PeerState } -func (psm *peerStatsMessage) handle(rm *ResponseManager) { - peerStats := rm.peerStats(psm.p) +func (psm *peerStateMessage) handle(rm *ResponseManager) { + peerState := rm.peerState(psm.p) select { - case psm.peerStatsChan <- peerStats: + case psm.peerStatsChan <- peerState: case <-rm.ctx.Done(): } } diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 4adb68d7..0745df4f 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -177,12 +177,22 @@ func TestStats(t *testing.T) { responseManager.ProcessRequests(td.ctx, td.p, td.requests) p2 := testutil.GeneratePeers(1)[0] responseManager.ProcessRequests(td.ctx, p2, td.requests) - stats := responseManager.PeerStats(td.p) - require.Len(t, stats, 1) - require.Equal(t, stats[td.requestID], graphsync.Queued) - stats = responseManager.PeerStats(p2) - require.Len(t, stats, 1) - require.Equal(t, stats[td.requestID], graphsync.Queued) + peerState := responseManager.PeerState(td.p) + require.Len(t, peerState.RequestStates, 1) + require.Equal(t, peerState.RequestStates[td.requestID], graphsync.Queued) + require.Len(t, peerState.Pending, 1) + require.Equal(t, peerState.Pending[0], td.requestID) + require.Len(t, peerState.Active, 0) + // no inconsistencies + require.Len(t, peerState.Diagnostics(), 0) + peerState = responseManager.PeerState(p2) + require.Len(t, peerState.RequestStates, 1) + require.Equal(t, peerState.RequestStates[td.requestID], graphsync.Queued) + require.Len(t, peerState.Pending, 1) + require.Equal(t, peerState.Pending[0], td.requestID) + require.Len(t, peerState.Active, 0) + // no inconsistencies + require.Len(t, peerState.Diagnostics(), 0) } func TestMissingContent(t *testing.T) { @@ -1113,7 +1123,7 @@ func (td *testData) newResponseManager() *ResponseManager { } func (td *testData) nullTaskQueueResponseManager() *ResponseManager { - ntq := nullTaskQueue{} + ntq := nullTaskQueue{tasksQueued: make(map[peer.ID][]peertask.Topic)} rm := New(td.ctx, td.persistence, td.responseAssembler, td.requestQueuedHooks, td.requestHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6, td.connManager, 0, ntq) return rm } @@ -1285,12 +1295,19 @@ func (td *testData) assertHasNetworkErrors(err error) { require.EqualError(td.t, receivedErr, err.Error()) } -type nullTaskQueue struct{} +type nullTaskQueue struct { + tasksQueued map[peer.ID][]peertask.Topic +} + +func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) { + ntq.tasksQueued[p] = append(ntq.tasksQueued[p], task.Topic) +} -func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) {} -func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {} -func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {} -func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} } -func (ntq nullTaskQueue) PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics { return nil } +func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {} +func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {} +func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} } +func (ntq nullTaskQueue) PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics { + return &peertracker.PeerTrackerTopics{Pending: ntq.tasksQueued[p]} +} var _ taskqueue.TaskQueue = nullTaskQueue{} diff --git a/responsemanager/server.go b/responsemanager/server.go index 3def7bea..d754b423 100644 --- a/responsemanager/server.go +++ b/responsemanager/server.go @@ -7,12 +7,14 @@ import ( "time" "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/ipfs/go-peertaskqueue/peertracker" "github.com/libp2p/go-libp2p-core/peer" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/queryexecutor" "github.com/ipfs/go-graphsync/responsemanager/responseassembler" @@ -271,12 +273,31 @@ func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID return nil } -func (rm *ResponseManager) peerStats(p peer.ID) graphsync.RequestStates { - peerStats := make(graphsync.RequestStates) +func (rm *ResponseManager) peerState(p peer.ID) peerstate.PeerState { + requestStates := make(graphsync.RequestStates) for key, ipr := range rm.inProgressResponses { if key.p == p { - peerStats[key.requestID] = ipr.state + requestStates[key.requestID] = ipr.state } } - return peerStats + peerTopics := rm.responseQueue.PeerTopics(p) + return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)} +} + +func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState { + if pt == nil { + return peerstate.TaskQueueState{} + } + active := make([]graphsync.RequestID, 0, len(pt.Active)) + for _, topic := range pt.Active { + active = append(active, topic.(responseKey).requestID) + } + pending := make([]graphsync.RequestID, 0, len(pt.Pending)) + for _, topic := range pt.Pending { + pending = append(pending, topic.(responseKey).requestID) + } + return peerstate.TaskQueueState{ + Active: active, + Pending: pending, + } } diff --git a/responsemanager/utils.go b/responsemanager/utils.go deleted file mode 100644 index fcffb258..00000000 --- a/responsemanager/utils.go +++ /dev/null @@ -1,11 +0,0 @@ -package responsemanager - -import ( - "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-peertaskqueue/peertask" -) - -// RequestIDFromTaskTopic extracts a request ID from a given peer task topic -func RequestIDFromTaskTopic(topic peertask.Topic) graphsync.RequestID { - return topic.(responseKey).requestID -} From 38821816349512360e2e332247205cc2d71f9b36 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 8 Dec 2021 16:05:53 -0800 Subject: [PATCH 3/3] refactor(peerstate): make diagnostics return array --- peerstate/peerstate.go | 22 ++++++++-------------- peerstate/peerstate_test.go | 32 ++++++++++++++++---------------- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/peerstate/peerstate.go b/peerstate/peerstate.go index e0a3ebfc..17c4f9d7 100644 --- a/peerstate/peerstate.go +++ b/peerstate/peerstate.go @@ -21,19 +21,19 @@ type PeerState struct { // Diagnostics compares request states with the current state of the task queue to identify unexpected // states or inconsistences between the tracked task queue and the tracked requests -func (ps PeerState) Diagnostics() map[graphsync.RequestID]string { +func (ps PeerState) Diagnostics() map[graphsync.RequestID][]string { matchedActiveQueue := make(map[graphsync.RequestID]struct{}, len(ps.RequestStates)) matchedPendingQueue := make(map[graphsync.RequestID]struct{}, len(ps.RequestStates)) - diagnostics := make(map[graphsync.RequestID]string) + diagnostics := make(map[graphsync.RequestID][]string) for _, id := range ps.TaskQueueState.Active { status, ok := ps.RequestStates[id] if ok { matchedActiveQueue[id] = struct{}{} if status != graphsync.Running { - diagnostics[id] = fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was %s", id, status) + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was %s", id, status)) } } else { - diagnostics[id] = fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", id) + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", id)) } } for _, id := range ps.TaskQueueState.Pending { @@ -41,27 +41,21 @@ func (ps PeerState) Diagnostics() map[graphsync.RequestID]string { if ok { matchedPendingQueue[id] = struct{}{} if status != graphsync.Queued { - diagnostics[id] = fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was %s", id, status) + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was %s", id, status)) } } else { - diagnostics[id] = fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", id) + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", id)) } } for id, state := range ps.RequestStates { if state == graphsync.Running { if _, ok := matchedActiveQueue[id]; !ok { - // prefer message over being in incorrect state if present over being missing from queue - if _, ok := diagnostics[id]; !ok { - diagnostics[id] = fmt.Sprintf("request with id %d in running state is not in the active task queue", id) - } + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in running state is not in the active task queue", id)) } } if state == graphsync.Queued { if _, ok := matchedPendingQueue[id]; !ok { - // prefer message over being in incorrect state if present over being missing from queue - if _, ok := diagnostics[id]; !ok { - diagnostics[id] = fmt.Sprintf("request with id %d in queued state is not in the pending task queue", id) - } + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in queued state is not in the pending task queue", id)) } } } diff --git a/peerstate/peerstate_test.go b/peerstate/peerstate_test.go index b5887f4e..01e94ab0 100644 --- a/peerstate/peerstate_test.go +++ b/peerstate/peerstate_test.go @@ -18,7 +18,7 @@ func TestDiagnostics(t *testing.T) { testCases := map[string]struct { requestStates graphsync.RequestStates queueStats peerstate.TaskQueueState - expectedDiagnostics map[graphsync.RequestID]string + expectedDiagnostics map[graphsync.RequestID][]string }{ "all requests and queue match": { requestStates: graphsync.RequestStates{ @@ -32,7 +32,7 @@ func TestDiagnostics(t *testing.T) { Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, }, - expectedDiagnostics: map[graphsync.RequestID]string{}, + expectedDiagnostics: map[graphsync.RequestID][]string{}, }, "active task with with incorrect state": { requestStates: graphsync.RequestStates{ @@ -46,9 +46,9 @@ func TestDiagnostics(t *testing.T) { Active: []graphsync.RequestID{requestIDs[0], requestIDs[1], requestIDs[4]}, Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[1]: fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was queued", requestIDs[1]), - requestIDs[4]: fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was paused", requestIDs[4]), + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[1]: {fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was queued", requestIDs[1]), fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[1])}, + requestIDs[4]: {fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was paused", requestIDs[4])}, }, }, "active task with no state": { @@ -62,8 +62,8 @@ func TestDiagnostics(t *testing.T) { Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[1]: fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", requestIDs[1]), + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[1]: {fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", requestIDs[1])}, }, }, "pending task with with incorrect state": { @@ -78,9 +78,9 @@ func TestDiagnostics(t *testing.T) { Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3], requestIDs[4]}, }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[3]: fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was running", requestIDs[3]), - requestIDs[4]: fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was paused", requestIDs[4]), + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[3]: {fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was running", requestIDs[3]), fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[3])}, + requestIDs[4]: {fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was paused", requestIDs[4])}, }, }, "pending task with no state": { @@ -94,8 +94,8 @@ func TestDiagnostics(t *testing.T) { Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[3]: fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", requestIDs[3]), + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[3]: {fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", requestIDs[3])}, }, }, "request state running with no active task": { @@ -110,8 +110,8 @@ func TestDiagnostics(t *testing.T) { Active: []graphsync.RequestID{requestIDs[0]}, Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[1]: fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[1]), + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[1]: {fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[1])}, }, }, "request state queued with no pending task": { @@ -126,8 +126,8 @@ func TestDiagnostics(t *testing.T) { Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, Pending: []graphsync.RequestID{requestIDs[2]}, }, - expectedDiagnostics: map[graphsync.RequestID]string{ - requestIDs[3]: fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[3]), + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[3]: {fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[3])}, }, }, }