Skip to content

Add an index for blocks in the on new block hooks #234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ type BlockData interface {

// BlockSize specifies the amount of data actually transmitted over the network
BlockSizeOnWire() uint64

// The index of this block in the selector traversal
Index() int64
}

// IncomingRequestHookActions are actions that a request hook can take to change
Expand Down
9 changes: 6 additions & 3 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
totalSent := 0
totalSentOnWire := 0
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
totalSent++
totalSent = int(blockData.Index())
if blockData.BlockSizeOnWire() > 0 {
totalSentOnWire++
}
Expand Down Expand Up @@ -364,8 +364,11 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
totalSent := 0
totalSentOnWire := 0
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
totalSent++
if blockData.BlockSizeOnWire() > 0 {
totalSent = int(blockData.Index())
if blockData.Index() <= 50 {
require.True(t, blockData.BlockSizeOnWire() == 0)
} else {
require.True(t, blockData.BlockSizeOnWire() > 0)
totalSentOnWire++
}
})
Expand Down
7 changes: 6 additions & 1 deletion requestmanager/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (e *Executor) advanceTraversal(rt RequestTask, result types.AsyncLoadResult
}

func (e *Executor) processResult(rt RequestTask, link ipld.Link, result types.AsyncLoadResult) error {
err := e.onNewBlock(rt, &blockData{link, result.Local, uint64(len(result.Data))})
err := e.onNewBlock(rt, &blockData{link, result.Local, uint64(len(result.Data)), int64(rt.Traverser.NBlocksTraversed())})
select {
case <-rt.PauseMessages:
if err == nil {
Expand Down Expand Up @@ -243,6 +243,7 @@ type blockData struct {
link ipld.Link
local bool
size uint64
index int64
}

// Link is the link/cid for the block
Expand All @@ -262,3 +263,7 @@ func (bd *blockData) BlockSizeOnWire() uint64 {
}
return bd.size
}

func (bd *blockData) Index() int64 {
return bd.index
}
4 changes: 2 additions & 2 deletions responsemanager/responseassembler/peerlinktracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool {

// RecordLinkTraversal records whether a link is found for a request.
func (prs *peerLinkTracker) RecordLinkTraversal(requestID graphsync.RequestID,
link ipld.Link, hasBlock bool) bool {
link ipld.Link, hasBlock bool) (bool, int64) {
prs.linkTrackerLk.Lock()
defer prs.linkTrackerLk.Unlock()
prs.blockSentCount[requestID]++
notSkipped := prs.skipFirstBlocks[requestID] < prs.blockSentCount[requestID]
linkTracker := prs.getLinkTracker(requestID)
isUnique := linkTracker.BlockRefCount(link) == 0
linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
return hasBlock && notSkipped && isUnique
return hasBlock && notSkipped && isUnique, prs.blockSentCount[requestID]
}
9 changes: 7 additions & 2 deletions responsemanager/responseassembler/responseBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func (rb *responseBuilder) AddNotifee(notifee notifications.Notifee) {
func (rb *responseBuilder) setupBlockOperation(
link ipld.Link, data []byte) blockOperation {
hasBlock := data != nil
send := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock)
send, index := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock)
return blockOperation{
data, send, link, rb.requestID,
data, send, link, rb.requestID, index,
}
}

Expand Down Expand Up @@ -113,6 +113,7 @@ type blockOperation struct {
sendBlock bool
link ipld.Link
requestID graphsync.RequestID
index int64
}

func (bo blockOperation) build(builder *gsmsg.Builder) {
Expand Down Expand Up @@ -142,6 +143,10 @@ func (bo blockOperation) BlockSizeOnWire() uint64 {
return bo.BlockSize()
}

func (bo blockOperation) Index() int64 {
return bo.index
}

func (bo blockOperation) size() uint64 {
return bo.BlockSizeOnWire()
}
4 changes: 4 additions & 0 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,10 @@ func (fbd fakeBlkData) BlockSizeOnWire() uint64 {
return fbd.size
}

func (fbd fakeBlkData) Index() int64 {
return 0
}

func (fra *fakeResponseAssembler) sendResponse(
requestID graphsync.RequestID,
link ipld.Link,
Expand Down
14 changes: 10 additions & 4 deletions testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ func NewTestLink() ipld.Link {
}

type fakeBlkData struct {
link ipld.Link
size uint64
link ipld.Link
size uint64
index int64
}

func (fbd fakeBlkData) Link() ipld.Link {
Expand All @@ -245,10 +246,15 @@ func (fbd fakeBlkData) BlockSizeOnWire() uint64 {
return fbd.size
}

func (fbd fakeBlkData) Index() int64 {
return fbd.index
}

// NewFakeBlockData returns a fake block that matches the block data interface
func NewFakeBlockData() graphsync.BlockData {
return &fakeBlkData{
link: cidlink.Link{Cid: GenerateCids(1)[0]},
size: rand.Uint64(),
link: cidlink.Link{Cid: GenerateCids(1)[0]},
size: rand.Uint64(),
index: rand.Int63(),
}
}