diff --git a/graphsync.go b/graphsync.go index 17f2dc07..e0fb6fc7 100644 --- a/graphsync.go +++ b/graphsync.go @@ -175,6 +175,9 @@ type BlockData interface { // BlockSize specifies the amount of data actually transmitted over the network BlockSizeOnWire() uint64 + + // The index of this block in the selector traversal + Index() int64 } // IncomingRequestHookActions are actions that a request hook can take to change diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 1ec45dce..5f9a6bdb 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -316,7 +316,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { totalSent := 0 totalSentOnWire := 0 responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) { - totalSent++ + totalSent = int(blockData.Index()) if blockData.BlockSizeOnWire() > 0 { totalSentOnWire++ } @@ -364,8 +364,11 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { totalSent := 0 totalSentOnWire := 0 responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) { - totalSent++ - if blockData.BlockSizeOnWire() > 0 { + totalSent = int(blockData.Index()) + if blockData.Index() <= 50 { + require.True(t, blockData.BlockSizeOnWire() == 0) + } else { + require.True(t, blockData.BlockSizeOnWire() > 0) totalSentOnWire++ } }) diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index 3158004a..253fd06f 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -190,7 +190,7 @@ func (e *Executor) advanceTraversal(rt RequestTask, result types.AsyncLoadResult } func (e *Executor) processResult(rt RequestTask, link ipld.Link, result types.AsyncLoadResult) error { - err := e.onNewBlock(rt, &blockData{link, result.Local, uint64(len(result.Data))}) + err := e.onNewBlock(rt, &blockData{link, result.Local, uint64(len(result.Data)), int64(rt.Traverser.NBlocksTraversed())}) select { case <-rt.PauseMessages: if err == nil { @@ -243,6 +243,7 @@ type blockData struct { link ipld.Link local bool size uint64 + index int64 } // Link is the link/cid for the block @@ -262,3 +263,7 @@ func (bd *blockData) BlockSizeOnWire() uint64 { } return bd.size } + +func (bd *blockData) Index() int64 { + return bd.index +} diff --git a/responsemanager/responseassembler/peerlinktracker.go b/responsemanager/responseassembler/peerlinktracker.go index 3bea04c3..b41e5784 100644 --- a/responsemanager/responseassembler/peerlinktracker.go +++ b/responsemanager/responseassembler/peerlinktracker.go @@ -91,7 +91,7 @@ func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool { // RecordLinkTraversal records whether a link is found for a request. func (prs *peerLinkTracker) RecordLinkTraversal(requestID graphsync.RequestID, - link ipld.Link, hasBlock bool) bool { + link ipld.Link, hasBlock bool) (bool, int64) { prs.linkTrackerLk.Lock() defer prs.linkTrackerLk.Unlock() prs.blockSentCount[requestID]++ @@ -99,5 +99,5 @@ func (prs *peerLinkTracker) RecordLinkTraversal(requestID graphsync.RequestID, linkTracker := prs.getLinkTracker(requestID) isUnique := linkTracker.BlockRefCount(link) == 0 linkTracker.RecordLinkTraversal(requestID, link, hasBlock) - return hasBlock && notSkipped && isUnique + return hasBlock && notSkipped && isUnique, prs.blockSentCount[requestID] } diff --git a/responsemanager/responseassembler/responseBuilder.go b/responsemanager/responseassembler/responseBuilder.go index 7752813c..1b78d8b8 100644 --- a/responsemanager/responseassembler/responseBuilder.go +++ b/responsemanager/responseassembler/responseBuilder.go @@ -60,9 +60,9 @@ func (rb *responseBuilder) AddNotifee(notifee notifications.Notifee) { func (rb *responseBuilder) setupBlockOperation( link ipld.Link, data []byte) blockOperation { hasBlock := data != nil - send := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock) + send, index := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock) return blockOperation{ - data, send, link, rb.requestID, + data, send, link, rb.requestID, index, } } @@ -113,6 +113,7 @@ type blockOperation struct { sendBlock bool link ipld.Link requestID graphsync.RequestID + index int64 } func (bo blockOperation) build(builder *gsmsg.Builder) { @@ -142,6 +143,10 @@ func (bo blockOperation) BlockSizeOnWire() uint64 { return bo.BlockSize() } +func (bo blockOperation) Index() int64 { + return bo.index +} + func (bo blockOperation) size() uint64 { return bo.BlockSizeOnWire() } diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index ef5267f1..dd5f71e7 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -884,6 +884,10 @@ func (fbd fakeBlkData) BlockSizeOnWire() uint64 { return fbd.size } +func (fbd fakeBlkData) Index() int64 { + return 0 +} + func (fra *fakeResponseAssembler) sendResponse( requestID graphsync.RequestID, link ipld.Link, diff --git a/testutil/testutil.go b/testutil/testutil.go index d9b0e978..fd8b3149 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -229,8 +229,9 @@ func NewTestLink() ipld.Link { } type fakeBlkData struct { - link ipld.Link - size uint64 + link ipld.Link + size uint64 + index int64 } func (fbd fakeBlkData) Link() ipld.Link { @@ -245,10 +246,15 @@ func (fbd fakeBlkData) BlockSizeOnWire() uint64 { return fbd.size } +func (fbd fakeBlkData) Index() int64 { + return fbd.index +} + // NewFakeBlockData returns a fake block that matches the block data interface func NewFakeBlockData() graphsync.BlockData { return &fakeBlkData{ - link: cidlink.Link{Cid: GenerateCids(1)[0]}, - size: rand.Uint64(), + link: cidlink.Link{Cid: GenerateCids(1)[0]}, + size: rand.Uint64(), + index: rand.Int63(), } }