diff --git a/allocator/allocator.go b/allocator/allocator.go index 8faa9ebd..d945ab7e 100644 --- a/allocator/allocator.go +++ b/allocator/allocator.go @@ -4,6 +4,7 @@ import ( "errors" "sync" + "github.com/ipfs/go-graphsync" pq "github.com/ipfs/go-ipfs-pq" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" @@ -12,8 +13,8 @@ import ( var log = logging.Logger("graphsync_allocator") type Allocator struct { - totalMemoryMax uint64 - perPeerMax uint64 + maxAllowedAllocatedTotal uint64 + maxAllowedAllocatedPerPeer uint64 allocLk sync.RWMutex totalAllocatedAllPeers uint64 @@ -22,13 +23,13 @@ type Allocator struct { peerStatusQueue pq.PQ } -func NewAllocator(totalMemoryMax uint64, perPeerMax uint64) *Allocator { +func NewAllocator(maxAllowedAllocatedTotal uint64, maxAllowedAllocatedPerPeer uint64) *Allocator { return &Allocator{ - totalMemoryMax: totalMemoryMax, - perPeerMax: perPeerMax, - totalAllocatedAllPeers: 0, - peerStatuses: make(map[peer.ID]*peerStatus), - peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)), + maxAllowedAllocatedTotal: maxAllowedAllocatedTotal, + maxAllowedAllocatedPerPeer: maxAllowedAllocatedPerPeer, + totalAllocatedAllPeers: 0, + peerStatuses: make(map[peer.ID]*peerStatus), + peerStatusQueue: pq.New(makePeerStatusCompare(maxAllowedAllocatedPerPeer)), } } @@ -58,13 +59,13 @@ func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error { a.peerStatuses[p] = status } - if (a.totalAllocatedAllPeers+amount <= a.totalMemoryMax) && (status.totalAllocated+amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 { + if (a.totalAllocatedAllPeers+amount <= a.maxAllowedAllocatedTotal) && (status.totalAllocated+amount <= a.maxAllowedAllocatedPerPeer) && len(status.pendingAllocations) == 0 { a.totalAllocatedAllPeers += amount status.totalAllocated += amount log.Debugw("bytes allocated", "amount", amount, "peer", p, "peer total", status.totalAllocated, "global total", a.totalAllocatedAllPeers) responseChan <- nil } else { - log.Debugw("byte allocation deferred pending memory release", "amount", amount, "peer", p, "peer total", status.totalAllocated, "global total", a.totalAllocatedAllPeers, "max per peer", a.perPeerMax, "global max", a.totalMemoryMax) + log.Debugw("byte allocation deferred pending memory release", "amount", amount, "peer", p, "peer total", status.totalAllocated, "global total", a.totalAllocatedAllPeers, "max per peer", a.maxAllowedAllocatedPerPeer, "global max", a.maxAllowedAllocatedTotal) pendingAllocation := pendingAllocation{p, amount, responseChan, a.nextAllocIndex} a.nextAllocIndex++ status.pendingAllocations = append(status.pendingAllocations, pendingAllocation) @@ -91,7 +92,7 @@ func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error { } else { a.totalAllocatedAllPeers = 0 } - log.Debugw("memory released", "amount", amount, "peer", p, "peer total", status.totalAllocated, "global total", a.totalAllocatedAllPeers, "max per peer", a.perPeerMax, "global max", a.totalMemoryMax) + log.Debugw("memory released", "amount", amount, "peer", p, "peer total", status.totalAllocated, "global total", a.totalAllocatedAllPeers, "max per peer", a.maxAllowedAllocatedPerPeer, "global max", a.maxAllowedAllocatedTotal) a.peerStatusQueue.Update(status.Index()) a.processPendingAllocations() return nil @@ -110,7 +111,7 @@ func (a *Allocator) ReleasePeerMemory(p peer.ID) error { pendingAllocation.response <- errors.New("peer has been deallocated") } a.totalAllocatedAllPeers -= status.totalAllocated - log.Debugw("memory released", "amount", status.totalAllocated, "peer", p, "peer total", 0, "global total", a.totalAllocatedAllPeers, "max per peer", a.perPeerMax, "global max", a.totalMemoryMax) + log.Debugw("memory released", "amount", status.totalAllocated, "peer", p, "peer total", 0, "global total", a.totalAllocatedAllPeers, "max per peer", a.maxAllowedAllocatedPerPeer, "global max", a.maxAllowedAllocatedTotal) a.processPendingAllocations() return nil } @@ -137,10 +138,10 @@ func (a *Allocator) processPendingAllocations() { func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bool { pendingAllocation := nextPeer.pendingAllocations[0] - if a.totalAllocatedAllPeers+pendingAllocation.amount > a.totalMemoryMax { + if a.totalAllocatedAllPeers+pendingAllocation.amount > a.maxAllowedAllocatedTotal { return false } - if nextPeer.totalAllocated+pendingAllocation.amount > a.perPeerMax { + if nextPeer.totalAllocated+pendingAllocation.amount > a.maxAllowedAllocatedPerPeer { return false } a.totalAllocatedAllPeers += pendingAllocation.amount @@ -151,6 +152,31 @@ func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bo return true } +func (a *Allocator) Stats() graphsync.ResponseStats { + a.allocLk.RLock() + defer a.allocLk.RUnlock() + + numPeersWithPendingAllocations := uint64(0) + totalPendingAllocations := uint64(0) + for _, status := range a.peerStatuses { + peerPendingAllocations := uint64(0) + for _, pa := range status.pendingAllocations { + peerPendingAllocations += pa.amount + } + if peerPendingAllocations > 0 { + numPeersWithPendingAllocations++ + totalPendingAllocations += peerPendingAllocations + } + } + return graphsync.ResponseStats{ + MaxAllowedAllocatedTotal: a.maxAllowedAllocatedTotal, + MaxAllowedAllocatedPerPeer: a.maxAllowedAllocatedPerPeer, + TotalAllocatedAllPeers: a.totalAllocatedAllPeers, + TotalPendingAllocations: totalPendingAllocations, + NumPeersWithPendingAllocations: numPeersWithPendingAllocations, + } +} + type peerStatus struct { p peer.ID totalAllocated uint64 diff --git a/allocator/allocator_test.go b/allocator/allocator_test.go index f4ce2562..b6dd86fd 100644 --- a/allocator/allocator_test.go +++ b/allocator/allocator_test.go @@ -341,6 +341,23 @@ func TestAllocator(t *testing.T) { pendingResults = append(pendingResults, next.pendingResult) } require.Equal(t, step.expectedPending, pendingResults) + expectedTotalPending := uint64(0) + expectedPeersPending := map[peer.ID]struct{}{} + for _, pendingResult := range step.expectedPending { + expectedTotalPending += pendingResult.amount + expectedPeersPending[pendingResult.p] = struct{}{} + } + expectedNumPeersPending := uint64(len(expectedPeersPending)) + expendingTotalAllocated := uint64(0) + for _, peerTotal := range step.totals { + expendingTotalAllocated += peerTotal + } + stats := allocator.Stats() + require.Equal(t, data.total, stats.MaxAllowedAllocatedTotal) + require.Equal(t, data.maxPerPeer, stats.MaxAllowedAllocatedPerPeer) + require.Equal(t, expectedNumPeersPending, stats.NumPeersWithPendingAllocations) + require.Equal(t, expendingTotalAllocated, stats.TotalAllocatedAllPeers) + require.Equal(t, expectedTotalPending, stats.TotalPendingAllocations) } }) } diff --git a/graphsync.go b/graphsync.go index e0fb6fc7..1a950eb1 100644 --- a/graphsync.go +++ b/graphsync.go @@ -287,6 +287,47 @@ type OnRequestorCancelledListener func(p peer.ID, request RequestData) // UnregisterHookFunc is a function call to unregister a hook that was previously registered type UnregisterHookFunc func() +// RequestStats offer statistics about request processing +type RequestStats struct { + // TotalPeers is the number of peers that have active or pending requests + TotalPeers uint64 + // Active is the total number of active requests being processing + Active uint64 + // Pending is the total number of requests that are waiting to be processed + Pending uint64 +} + +// ResponseStats offer statistics about memory allocations for responses +type ResponseStats struct { + // MaxAllowedAllocatedTotal is the preconfigured limit on allocations + // for all peers + MaxAllowedAllocatedTotal uint64 + // MaxAllowedAllocatedPerPeer is the preconfigured limit on allocations + // for an individual peer + MaxAllowedAllocatedPerPeer uint64 + // TotalAllocatedAllPeers indicates the amount of memory allocated for blocks + // across all peers + TotalAllocatedAllPeers uint64 + // TotalPendingAllocations indicates the amount awaiting freeing up of memory + TotalPendingAllocations uint64 + // NumPeersWithPendingAllocations indicates the number of peers that + // have either maxed out their individual memory allocations or have + // pending allocations cause the total limit has been reached. + NumPeersWithPendingAllocations uint64 +} + +// Stats describes statistics about the Graphsync implementations +// current state +type Stats struct { + // Stats for the graphsync requestor + OutgoingRequests RequestStats + IncomingResponses ResponseStats + + // Stats for the graphsync responder + IncomingRequests RequestStats + OutgoingResponses ResponseStats +} + // GraphExchange is a protocol that can exchange IPLD graphs based on a selector type GraphExchange interface { // Request initiates a new GraphSync request to the given peer using the given selector spec. @@ -354,4 +395,7 @@ type GraphExchange interface { // CancelRequest cancels an in progress request CancelRequest(context.Context, RequestID) error + + // Stats produces insight on the current state of a graphsync exchange + Stats() Stats } diff --git a/impl/graphsync.go b/impl/graphsync.go index ef3098ea..8acb7dc6 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -380,6 +380,28 @@ func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.Requ return gs.requestManager.CancelRequest(ctx, requestID) } +// Stats produces insight on the current state of a graphsync exchange +func (gs *GraphSync) Stats() graphsync.Stats { + outgoingRequestStats := gs.requestQueue.Stats() + incomingResponseStats := gs.requestAllocator.Stats() + + ptqstats := gs.peerTaskQueue.Stats() + incomingRequestStats := graphsync.RequestStats{ + TotalPeers: uint64(ptqstats.NumPeers), + Active: uint64(ptqstats.NumActive), + Pending: uint64(ptqstats.NumPending), + } + outgoingResponseStats := gs.responseAllocator.Stats() + + return graphsync.Stats{ + OutgoingRequests: outgoingRequestStats, + IncomingResponses: incomingResponseStats, + + IncomingRequests: incomingRequestStats, + OutgoingResponses: outgoingResponseStats, + } +} + type graphSyncReceiver GraphSync func (gsr *graphSyncReceiver) graphSync() *GraphSync { diff --git a/taskqueue/taskqueue.go b/taskqueue/taskqueue.go index 29dbe45c..41291410 100644 --- a/taskqueue/taskqueue.go +++ b/taskqueue/taskqueue.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/ipfs/go-graphsync" "github.com/ipfs/go-peertaskqueue" "github.com/ipfs/go-peertaskqueue/peertask" peer "github.com/libp2p/go-libp2p-core/peer" @@ -19,6 +20,7 @@ type Executor interface { type TaskQueue interface { PushTask(p peer.ID, task peertask.Task) TaskDone(p peer.ID, task *peertask.Task) + Stats() graphsync.RequestStats } // TaskQueue is a wrapper around peertaskqueue.PeerTaskQueue that manages running workers @@ -57,6 +59,16 @@ func (tq *WorkerTaskQueue) TaskDone(p peer.ID, task *peertask.Task) { tq.peerTaskQueue.TasksDone(p, task) } +// Stats returns statistics about a task queue +func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats { + ptqstats := tq.peerTaskQueue.Stats() + return graphsync.RequestStats{ + TotalPeers: uint64(ptqstats.NumPeers), + Active: uint64(ptqstats.NumActive), + Pending: uint64(ptqstats.NumPending), + } +} + // Startup runs the given number of task workers with the given executor func (tq *WorkerTaskQueue) Startup(workerCount uint64, executor Executor) { for i := uint64(0); i < workerCount; i++ {