diff --git a/go.mod b/go.mod index a9588c2e..33447445 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/ipfs/go-ipld-format v0.2.0 github.com/ipfs/go-log/v2 v2.3.0 github.com/ipfs/go-merkledag v0.5.1 - github.com/ipfs/go-peertaskqueue v0.7.0 + github.com/ipfs/go-peertaskqueue v0.7.1 github.com/ipfs/go-unixfs v0.2.4 github.com/ipld/go-codec-dagpb v1.3.0 github.com/ipld/go-ipld-prime v0.12.3 diff --git a/go.sum b/go.sum index 85b9a42f..1bfc2705 100644 --- a/go.sum +++ b/go.sum @@ -444,8 +444,9 @@ github.com/ipfs/go-merkledag v0.5.1/go.mod h1:cLMZXx8J08idkp5+id62iVftUQV+HlYJ3P github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U= -github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5seGNsQ0= github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU= +github.com/ipfs/go-peertaskqueue v0.7.1 h1:7PLjon3RZwRQMgOTvYccZ+mjzkmds/7YzSWKFlBAypE= +github.com/ipfs/go-peertaskqueue v0.7.1/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU= github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo= github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw= github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E= diff --git a/graphsync.go b/graphsync.go index 20b69266..a23d9b17 100644 --- a/graphsync.go +++ b/graphsync.go @@ -347,6 +347,19 @@ const ( Paused ) +func (rs RequestState) String() string { + switch rs { + case Queued: + return "queued" + case Running: + return "running" + case Paused: + return "paused" + default: + return "unrecognized request state" + } +} + // GraphExchange is a protocol that can exchange IPLD graphs based on a selector type GraphExchange interface { // Request initiates a new GraphSync request to the given peer using the given selector spec. diff --git a/impl/graphsync.go b/impl/graphsync.go index 636e3711..49e5e3f8 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -19,6 +19,7 @@ import ( "github.com/ipfs/go-graphsync/messagequeue" gsnet "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/peermanager" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/requestmanager" "github.com/ipfs/go-graphsync/requestmanager/asyncloader" "github.com/ipfs/go-graphsync/requestmanager/executor" @@ -458,19 +459,17 @@ func (gs *GraphSync) Stats() graphsync.Stats { } } -// PeerStats describes the state of graphsync for a given -type PeerStats struct { - // OutgoingRequests - OutgoingRequests graphsync.RequestStates - // IncomingRequests - IncomingRequests graphsync.RequestStates +// PeerState describes the state of graphsync for a given peer +type PeerState struct { + OutgoingState peerstate.PeerState + IncomingState peerstate.PeerState } -// PeerStats produces insight on the current state of a given peer -func (gs *GraphSync) PeerStats(p peer.ID) PeerStats { - return PeerStats{ - OutgoingRequests: gs.requestManager.PeerStats(p), - IncomingRequests: gs.responseManager.PeerStats(p), +// PeerState produces insight on the current state of a given peer +func (gs *GraphSync) PeerState(p peer.ID) PeerState { + return PeerState{ + OutgoingState: gs.requestManager.PeerState(p), + IncomingState: gs.responseManager.PeerState(p), } } diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 3f3470d6..af03aa51 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -588,13 +588,24 @@ func TestPauseResume(t *testing.T) { timer := time.NewTimer(100 * time.Millisecond) testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan) - requestorPeerStats := requestor.(*GraphSync).PeerStats(td.host2.ID()) - require.Len(t, requestorPeerStats.OutgoingRequests, 1) - require.Len(t, requestorPeerStats.IncomingRequests, 0) - - responderPeerStats := responder.(*GraphSync).PeerStats(td.host1.ID()) - require.Len(t, responderPeerStats.IncomingRequests, 1) - require.Len(t, responderPeerStats.OutgoingRequests, 0) + requestorPeerState := requestor.(*GraphSync).PeerState(td.host2.ID()) + require.Len(t, requestorPeerState.OutgoingState.RequestStates, 1) + require.Len(t, requestorPeerState.IncomingState.RequestStates, 0) + require.Len(t, requestorPeerState.OutgoingState.Active, 1) + require.Contains(t, requestorPeerState.OutgoingState.RequestStates, requestorPeerState.OutgoingState.Active[0]) + require.Len(t, requestorPeerState.OutgoingState.Pending, 0) + require.Len(t, requestorPeerState.IncomingState.Active, 0) + require.Len(t, requestorPeerState.IncomingState.Pending, 0) + require.Len(t, requestorPeerState.OutgoingState.Diagnostics(), 0) + responderPeerState := responder.(*GraphSync).PeerState(td.host1.ID()) + require.Len(t, responderPeerState.IncomingState.RequestStates, 1) + require.Len(t, responderPeerState.OutgoingState.RequestStates, 0) + // no tasks as response is paused by responder + require.Len(t, responderPeerState.IncomingState.Active, 0) + require.Len(t, responderPeerState.IncomingState.Pending, 0) + require.Len(t, responderPeerState.OutgoingState.Active, 0) + require.Len(t, responderPeerState.OutgoingState.Pending, 0) + require.Len(t, responderPeerState.IncomingState.Diagnostics(), 0) requestID := <-requestIDChan err := responder.UnpauseResponse(td.host1.ID(), requestID) diff --git a/peerstate/peerstate.go b/peerstate/peerstate.go new file mode 100644 index 00000000..17c4f9d7 --- /dev/null +++ b/peerstate/peerstate.go @@ -0,0 +1,63 @@ +package peerstate + +import ( + "fmt" + + "github.com/ipfs/go-graphsync" +) + +// TaskQueueState describes the the set of requests for a given peer in a task queue +type TaskQueueState struct { + Active []graphsync.RequestID + Pending []graphsync.RequestID +} + +// PeerState tracks the over all state of a given peer for either +// incoming or outgoing requests +type PeerState struct { + graphsync.RequestStates + TaskQueueState +} + +// Diagnostics compares request states with the current state of the task queue to identify unexpected +// states or inconsistences between the tracked task queue and the tracked requests +func (ps PeerState) Diagnostics() map[graphsync.RequestID][]string { + matchedActiveQueue := make(map[graphsync.RequestID]struct{}, len(ps.RequestStates)) + matchedPendingQueue := make(map[graphsync.RequestID]struct{}, len(ps.RequestStates)) + diagnostics := make(map[graphsync.RequestID][]string) + for _, id := range ps.TaskQueueState.Active { + status, ok := ps.RequestStates[id] + if ok { + matchedActiveQueue[id] = struct{}{} + if status != graphsync.Running { + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was %s", id, status)) + } + } else { + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", id)) + } + } + for _, id := range ps.TaskQueueState.Pending { + status, ok := ps.RequestStates[id] + if ok { + matchedPendingQueue[id] = struct{}{} + if status != graphsync.Queued { + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was %s", id, status)) + } + } else { + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", id)) + } + } + for id, state := range ps.RequestStates { + if state == graphsync.Running { + if _, ok := matchedActiveQueue[id]; !ok { + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in running state is not in the active task queue", id)) + } + } + if state == graphsync.Queued { + if _, ok := matchedPendingQueue[id]; !ok { + diagnostics[id] = append(diagnostics[id], fmt.Sprintf("request with id %d in queued state is not in the pending task queue", id)) + } + } + } + return diagnostics +} diff --git a/peerstate/peerstate_test.go b/peerstate/peerstate_test.go new file mode 100644 index 00000000..01e94ab0 --- /dev/null +++ b/peerstate/peerstate_test.go @@ -0,0 +1,139 @@ +package peerstate_test + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/peerstate" + "github.com/stretchr/testify/require" +) + +func TestDiagnostics(t *testing.T) { + requestIDs := make([]graphsync.RequestID, 0, 5) + for i := 0; i < 5; i++ { + requestIDs = append(requestIDs, graphsync.RequestID(rand.Int31())) + } + testCases := map[string]struct { + requestStates graphsync.RequestStates + queueStats peerstate.TaskQueueState + expectedDiagnostics map[graphsync.RequestID][]string + }{ + "all requests and queue match": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID][]string{}, + }, + "active task with with incorrect state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Queued, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1], requestIDs[4]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[1]: {fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was queued", requestIDs[1]), fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[1])}, + requestIDs[4]: {fmt.Sprintf("expected request with id %d in active task queue to be in running state, but was paused", requestIDs[4])}, + }, + }, + "active task with no state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[1]: {fmt.Sprintf("request with id %d in active task queue but appears to have no tracked state", requestIDs[1])}, + }, + }, + "pending task with with incorrect state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Running, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3], requestIDs[4]}, + }, + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[3]: {fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was running", requestIDs[3]), fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[3])}, + requestIDs[4]: {fmt.Sprintf("expected request with id %d in pending task queue to be in queued state, but was paused", requestIDs[4])}, + }, + }, + "pending task with no state": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[3]: {fmt.Sprintf("request with id %d in pending task queue but appears to have no tracked state", requestIDs[3])}, + }, + }, + "request state running with no active task": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0]}, + Pending: []graphsync.RequestID{requestIDs[2], requestIDs[3]}, + }, + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[1]: {fmt.Sprintf("request with id %d in running state is not in the active task queue", requestIDs[1])}, + }, + }, + "request state queued with no pending task": { + requestStates: graphsync.RequestStates{ + requestIDs[0]: graphsync.Running, + requestIDs[1]: graphsync.Running, + requestIDs[2]: graphsync.Queued, + requestIDs[3]: graphsync.Queued, + requestIDs[4]: graphsync.Paused, + }, + queueStats: peerstate.TaskQueueState{ + Active: []graphsync.RequestID{requestIDs[0], requestIDs[1]}, + Pending: []graphsync.RequestID{requestIDs[2]}, + }, + expectedDiagnostics: map[graphsync.RequestID][]string{ + requestIDs[3]: {fmt.Sprintf("request with id %d in queued state is not in the pending task queue", requestIDs[3])}, + }, + }, + } + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + require.Equal(t, data.expectedDiagnostics, peerstate.PeerState{data.requestStates, data.queueStats}.Diagnostics()) + }) + } +} diff --git a/requestmanager/client.go b/requestmanager/client.go index 578a6961..95081b3d 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -27,6 +27,7 @@ import ( "github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/requestmanager/executor" "github.com/ipfs/go-graphsync/requestmanager/hooks" "github.com/ipfs/go-graphsync/requestmanager/types" @@ -332,15 +333,15 @@ func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err } } -// PeerStats gets stats on all outgoing requests for a given peer -func (rm *RequestManager) PeerStats(p peer.ID) graphsync.RequestStates { - response := make(chan graphsync.RequestStates) - rm.send(&peerStatsMessage{p, response}, nil) +// PeerState gets stats on all outgoing requests for a given peer +func (rm *RequestManager) PeerState(p peer.ID) peerstate.PeerState { + response := make(chan peerstate.PeerState) + rm.send(&peerStateMessage{p, response}, nil) select { case <-rm.ctx.Done(): - return nil - case peerStats := <-response: - return peerStats + return peerstate.PeerState{} + case peerState := <-response: + return peerState } } diff --git a/requestmanager/messages.go b/requestmanager/messages.go index ff99833d..dd5acf3c 100644 --- a/requestmanager/messages.go +++ b/requestmanager/messages.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-graphsync" gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/requestmanager/executor" ) @@ -109,12 +110,12 @@ func (nrm *newRequestMessage) handle(rm *RequestManager) { } } -type peerStatsMessage struct { +type peerStateMessage struct { p peer.ID - peerStatsChan chan<- graphsync.RequestStates + peerStatsChan chan<- peerstate.PeerState } -func (psm *peerStatsMessage) handle(rm *RequestManager) { +func (psm *peerStateMessage) handle(rm *RequestManager) { peerStats := rm.peerStats(psm.p) select { case psm.peerStatsChan <- peerStats: diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index f5f1bc15..c62d9702 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -994,10 +994,14 @@ func TestStats(t *testing.T) { requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 3) - states := td.requestManager.PeerStats(peers[0]) - require.Len(t, states, 2) - require.Equal(t, states[requestRecords[0].gsr.ID()], graphsync.Running) - require.Equal(t, states[requestRecords[1].gsr.ID()], graphsync.Running) + peerState := td.requestManager.PeerState(peers[0]) + require.Len(t, peerState.RequestStates, 2) + require.Equal(t, peerState.RequestStates[requestRecords[0].gsr.ID()], graphsync.Running) + require.Equal(t, peerState.RequestStates[requestRecords[1].gsr.ID()], graphsync.Running) + require.Len(t, peerState.Active, 2) + require.Contains(t, peerState.Active, requestRecords[0].gsr.ID()) + require.Contains(t, peerState.Active, requestRecords[1].gsr.ID()) + require.Len(t, peerState.Pending, 0) } type requestRecord struct { diff --git a/requestmanager/server.go b/requestmanager/server.go index 328d7751..da7fd0f0 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -10,6 +10,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/ipfs/go-peertaskqueue/peertracker" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal" @@ -25,6 +26,7 @@ import ( "github.com/ipfs/go-graphsync/dedupkey" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/requestmanager/executor" "github.com/ipfs/go-graphsync/requestmanager/hooks" ) @@ -392,12 +394,31 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error { return nil } -func (rm *RequestManager) peerStats(p peer.ID) graphsync.RequestStates { - peerStats := make(graphsync.RequestStates) +func (rm *RequestManager) peerStats(p peer.ID) peerstate.PeerState { + requestStates := make(graphsync.RequestStates) for id, ipr := range rm.inProgressRequestStatuses { if ipr.p == p { - peerStats[id] = graphsync.RequestState(ipr.state) + requestStates[id] = graphsync.RequestState(ipr.state) } } - return peerStats + peerTopics := rm.requestQueue.PeerTopics(p) + return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)} +} + +func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState { + if pt == nil { + return peerstate.TaskQueueState{} + } + active := make([]graphsync.RequestID, 0, len(pt.Active)) + for _, topic := range pt.Active { + active = append(active, topic.(graphsync.RequestID)) + } + pending := make([]graphsync.RequestID, 0, len(pt.Pending)) + for _, topic := range pt.Pending { + pending = append(pending, topic.(graphsync.RequestID)) + } + return peerstate.TaskQueueState{ + Active: active, + Pending: pending, + } } diff --git a/requestmanager/utils.go b/requestmanager/utils.go index a4f0c758..4f875deb 100644 --- a/requestmanager/utils.go +++ b/requestmanager/utils.go @@ -4,6 +4,7 @@ import ( "github.com/ipfs/go-graphsync" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/metadata" + "github.com/ipfs/go-peertaskqueue/peertask" ) func metadataForResponses(responses []gsmsg.GraphSyncResponse) map[graphsync.RequestID]metadata.Metadata { @@ -23,3 +24,8 @@ func metadataForResponses(responses []gsmsg.GraphSyncResponse) map[graphsync.Req } return responseMetadata } + +// RequestIDFromTaskTopic extracts a request ID from a given peer task topic +func RequestIDFromTaskTopic(topic peertask.Topic) graphsync.RequestID { + return topic.(graphsync.RequestID) +} diff --git a/responsemanager/client.go b/responsemanager/client.go index 4e847a9e..de284e3a 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -15,6 +15,7 @@ import ( gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/queryexecutor" "github.com/ipfs/go-graphsync/responsemanager/responseassembler" @@ -231,15 +232,15 @@ func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync. rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, make(chan error, 1)}, nil) } -// PeerStats gets stats on all outgoing requests for a given peer -func (rm *ResponseManager) PeerStats(p peer.ID) graphsync.RequestStates { - response := make(chan graphsync.RequestStates) - rm.send(&peerStatsMessage{p, response}, nil) +// PeerState gets current state of the outgoing responses for a given peer +func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState { + response := make(chan peerstate.PeerState) + rm.send(&peerStateMessage{p, response}, nil) select { case <-rm.ctx.Done(): - return nil - case peerStats := <-response: - return peerStats + return peerstate.PeerState{} + case peerState := <-response: + return peerState } } diff --git a/responsemanager/messages.go b/responsemanager/messages.go index 26e319a6..0a05f08e 100644 --- a/responsemanager/messages.go +++ b/responsemanager/messages.go @@ -6,6 +6,7 @@ import ( "github.com/ipfs/go-graphsync" gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/responsemanager/queryexecutor" ) @@ -114,15 +115,15 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { rm.processRequests(prm.p, prm.requests) } -type peerStatsMessage struct { +type peerStateMessage struct { p peer.ID - peerStatsChan chan<- graphsync.RequestStates + peerStatsChan chan<- peerstate.PeerState } -func (psm *peerStatsMessage) handle(rm *ResponseManager) { - peerStats := rm.peerStats(psm.p) +func (psm *peerStateMessage) handle(rm *ResponseManager) { + peerState := rm.peerState(psm.p) select { - case psm.peerStatsChan <- peerStats: + case psm.peerStatsChan <- peerState: case <-rm.ctx.Done(): } } diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 204f786d..0745df4f 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -11,6 +11,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/ipfs/go-peertaskqueue/peertracker" ipld "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/node/basicnode" @@ -176,12 +177,22 @@ func TestStats(t *testing.T) { responseManager.ProcessRequests(td.ctx, td.p, td.requests) p2 := testutil.GeneratePeers(1)[0] responseManager.ProcessRequests(td.ctx, p2, td.requests) - stats := responseManager.PeerStats(td.p) - require.Len(t, stats, 1) - require.Equal(t, stats[td.requestID], graphsync.Queued) - stats = responseManager.PeerStats(p2) - require.Len(t, stats, 1) - require.Equal(t, stats[td.requestID], graphsync.Queued) + peerState := responseManager.PeerState(td.p) + require.Len(t, peerState.RequestStates, 1) + require.Equal(t, peerState.RequestStates[td.requestID], graphsync.Queued) + require.Len(t, peerState.Pending, 1) + require.Equal(t, peerState.Pending[0], td.requestID) + require.Len(t, peerState.Active, 0) + // no inconsistencies + require.Len(t, peerState.Diagnostics(), 0) + peerState = responseManager.PeerState(p2) + require.Len(t, peerState.RequestStates, 1) + require.Equal(t, peerState.RequestStates[td.requestID], graphsync.Queued) + require.Len(t, peerState.Pending, 1) + require.Equal(t, peerState.Pending[0], td.requestID) + require.Len(t, peerState.Active, 0) + // no inconsistencies + require.Len(t, peerState.Diagnostics(), 0) } func TestMissingContent(t *testing.T) { @@ -1112,7 +1123,7 @@ func (td *testData) newResponseManager() *ResponseManager { } func (td *testData) nullTaskQueueResponseManager() *ResponseManager { - ntq := nullTaskQueue{} + ntq := nullTaskQueue{tasksQueued: make(map[peer.ID][]peertask.Topic)} rm := New(td.ctx, td.persistence, td.responseAssembler, td.requestQueuedHooks, td.requestHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6, td.connManager, 0, ntq) return rm } @@ -1284,11 +1295,19 @@ func (td *testData) assertHasNetworkErrors(err error) { require.EqualError(td.t, receivedErr, err.Error()) } -type nullTaskQueue struct{} +type nullTaskQueue struct { + tasksQueued map[peer.ID][]peertask.Topic +} + +func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) { + ntq.tasksQueued[p] = append(ntq.tasksQueued[p], task.Topic) +} -func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) {} func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {} func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {} func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} } +func (ntq nullTaskQueue) PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics { + return &peertracker.PeerTrackerTopics{Pending: ntq.tasksQueued[p]} +} var _ taskqueue.TaskQueue = nullTaskQueue{} diff --git a/responsemanager/server.go b/responsemanager/server.go index 3def7bea..d754b423 100644 --- a/responsemanager/server.go +++ b/responsemanager/server.go @@ -7,12 +7,14 @@ import ( "time" "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/ipfs/go-peertaskqueue/peertracker" "github.com/libp2p/go-libp2p-core/peer" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/peerstate" "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/queryexecutor" "github.com/ipfs/go-graphsync/responsemanager/responseassembler" @@ -271,12 +273,31 @@ func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID return nil } -func (rm *ResponseManager) peerStats(p peer.ID) graphsync.RequestStates { - peerStats := make(graphsync.RequestStates) +func (rm *ResponseManager) peerState(p peer.ID) peerstate.PeerState { + requestStates := make(graphsync.RequestStates) for key, ipr := range rm.inProgressResponses { if key.p == p { - peerStats[key.requestID] = ipr.state + requestStates[key.requestID] = ipr.state } } - return peerStats + peerTopics := rm.responseQueue.PeerTopics(p) + return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)} +} + +func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState { + if pt == nil { + return peerstate.TaskQueueState{} + } + active := make([]graphsync.RequestID, 0, len(pt.Active)) + for _, topic := range pt.Active { + active = append(active, topic.(responseKey).requestID) + } + pending := make([]graphsync.RequestID, 0, len(pt.Pending)) + for _, topic := range pt.Pending { + pending = append(pending, topic.(responseKey).requestID) + } + return peerstate.TaskQueueState{ + Active: active, + Pending: pending, + } } diff --git a/taskqueue/taskqueue.go b/taskqueue/taskqueue.go index a1414735..7a667a29 100644 --- a/taskqueue/taskqueue.go +++ b/taskqueue/taskqueue.go @@ -7,6 +7,7 @@ import ( "github.com/ipfs/go-peertaskqueue" "github.com/ipfs/go-peertaskqueue/peertask" + "github.com/ipfs/go-peertaskqueue/peertracker" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/ipfs/go-graphsync" @@ -24,18 +25,19 @@ type TaskQueue interface { TaskDone(p peer.ID, task *peertask.Task) Remove(t peertask.Topic, p peer.ID) Stats() graphsync.RequestStats + PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics } -// TaskQueue is a wrapper around peertaskqueue.PeerTaskQueue that manages running workers +// WorkerTaskQueue is a wrapper around peertaskqueue.PeerTaskQueue that manages running workers // that pop tasks and execute them type WorkerTaskQueue struct { - ctx context.Context - cancelFn func() - peerTaskQueue *peertaskqueue.PeerTaskQueue - workSignal chan struct{} - noTaskCond *sync.Cond - ticker *time.Ticker - activeTasks int32 + *peertaskqueue.PeerTaskQueue + ctx context.Context + cancelFn func() + workSignal chan struct{} + noTaskCond *sync.Cond + ticker *time.Ticker + activeTasks int32 } // NewTaskQueue initializes a new queue @@ -44,7 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue { return &WorkerTaskQueue{ ctx: ctx, cancelFn: cancelFn, - peerTaskQueue: peertaskqueue.New(), + PeerTaskQueue: peertaskqueue.New(), workSignal: make(chan struct{}, 1), noTaskCond: sync.NewCond(&sync.Mutex{}), ticker: time.NewTicker(thawSpeed), @@ -53,7 +55,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue { // PushTask pushes a new task on to the queue func (tq *WorkerTaskQueue) PushTask(p peer.ID, task peertask.Task) { - tq.peerTaskQueue.PushTasks(p, task) + tq.PeerTaskQueue.PushTasks(p, task) select { case tq.workSignal <- struct{}{}: default: @@ -62,12 +64,12 @@ func (tq *WorkerTaskQueue) PushTask(p peer.ID, task peertask.Task) { // TaskDone marks a task as completed so further tasks can be executed func (tq *WorkerTaskQueue) TaskDone(p peer.ID, task *peertask.Task) { - tq.peerTaskQueue.TasksDone(p, task) + tq.PeerTaskQueue.TasksDone(p, task) } // Stats returns statistics about a task queue func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats { - ptqstats := tq.peerTaskQueue.Stats() + ptqstats := tq.PeerTaskQueue.Stats() return graphsync.RequestStats{ TotalPeers: uint64(ptqstats.NumPeers), Active: uint64(ptqstats.NumActive), @@ -75,11 +77,6 @@ func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats { } } -// Remove removes a task from the execution queue -func (tq *WorkerTaskQueue) Remove(topic peertask.Topic, p peer.ID) { - tq.peerTaskQueue.Remove(topic, p) -} - // Startup runs the given number of task workers with the given executor func (tq *WorkerTaskQueue) Startup(workerCount uint64, executor Executor) { for i := uint64(0); i < workerCount; i++ { @@ -103,16 +100,16 @@ func (tq *WorkerTaskQueue) WaitForNoActiveTasks() { func (tq *WorkerTaskQueue) worker(executor Executor) { targetWork := 1 for { - pid, tasks, _ := tq.peerTaskQueue.PopTasks(targetWork) + pid, tasks, _ := tq.PeerTaskQueue.PopTasks(targetWork) for len(tasks) == 0 { select { case <-tq.ctx.Done(): return case <-tq.workSignal: - pid, tasks, _ = tq.peerTaskQueue.PopTasks(targetWork) + pid, tasks, _ = tq.PeerTaskQueue.PopTasks(targetWork) case <-tq.ticker.C: - tq.peerTaskQueue.ThawRound() - pid, tasks, _ = tq.peerTaskQueue.PopTasks(targetWork) + tq.PeerTaskQueue.ThawRound() + pid, tasks, _ = tq.PeerTaskQueue.PopTasks(targetWork) } } for _, task := range tasks {