Skip to content

Commit 57df640

Browse files
Do not send first blocks extension (#230)
* feat(responseassembler): add response skipping * feat(responsemanager): process do not send first blocks define extension and add processing to response manager * test(impl): add integration test add test demonstrating do not send first blocks flow * Update donotsendfirstblocks/donotsendfirstblocks.go Co-authored-by: Will <[email protected]> * Update impl/graphsync_test.go Co-authored-by: Will <[email protected]> Co-authored-by: Will <[email protected]>
1 parent ce3951d commit 57df640

10 files changed

+246
-13
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package donotsendfirstblocks
2+
3+
import (
4+
basicnode "github.com/ipld/go-ipld-prime/node/basic"
5+
6+
"github.com/ipfs/go-graphsync/ipldutil"
7+
)
8+
9+
// EncodeDoNotSendFirstBlocks returns encoded cbor data for the given number
10+
// of blocks to skip
11+
func EncodeDoNotSendFirstBlocks(skipBlockCount int64) ([]byte, error) {
12+
nd := basicnode.NewInt(skipBlockCount)
13+
return ipldutil.EncodeNode(nd)
14+
}
15+
16+
// DecodeDoNotSendFirstBlocks returns the number of blocks to skip
17+
func DecodeDoNotSendFirstBlocks(data []byte) (int64, error) {
18+
nd, err := ipldutil.DecodeNode(data)
19+
if err != nil {
20+
return 0, err
21+
}
22+
return nd.AsInt()
23+
}

graphsync.go

+4
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ const (
4545
// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
4646
ExtensionDoNotSendCIDs = ExtensionName("graphsync/do-not-send-cids")
4747

48+
// ExtensionsDoNotSendFirstBlocks tells the responding peer not to wait till the given
49+
// number of blocks have been traversed before it begins to send blocks over the wire
50+
ExtensionsDoNotSendFirstBlocks = ExtensionName("graphsync/do-not-send-first-blocks")
51+
4852
// ExtensionDeDupByKey tells the responding peer to only deduplicate block sending
4953
// for requests that have the same key. The data for the extension is a string key
5054
ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key")

impl/graphsync_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040

4141
"github.com/ipfs/go-graphsync"
4242
"github.com/ipfs/go-graphsync/cidset"
43+
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
4344
gsmsg "github.com/ipfs/go-graphsync/message"
4445
gsnet "github.com/ipfs/go-graphsync/network"
4546
"github.com/ipfs/go-graphsync/storeutil"
@@ -331,6 +332,54 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
331332
require.Equal(t, blockChainLength-set.Len(), totalSentOnWire)
332333
}
333334

335+
func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
336+
// create network
337+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
338+
defer cancel()
339+
td := newGsTestData(ctx, t)
340+
341+
// initialize graphsync on first node to make requests
342+
requestor := td.GraphSyncHost1()
343+
344+
// setup receiving peer to just record message coming in
345+
blockChainLength := 100
346+
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)
347+
348+
// store blocks locally
349+
firstHalf := blockChain.Blocks(0, 50)
350+
for _, blk := range firstHalf {
351+
td.blockStore1[cidlink.Link{Cid: blk.Cid()}] = blk.RawData()
352+
}
353+
354+
doNotSendFirstBlocksData, err := donotsendfirstblocks.EncodeDoNotSendFirstBlocks(50)
355+
require.NoError(t, err)
356+
extension := graphsync.ExtensionData{
357+
Name: graphsync.ExtensionsDoNotSendFirstBlocks,
358+
Data: doNotSendFirstBlocksData,
359+
}
360+
361+
// initialize graphsync on second node to response to requests
362+
responder := td.GraphSyncHost2()
363+
364+
totalSent := 0
365+
totalSentOnWire := 0
366+
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
367+
totalSent++
368+
if blockData.BlockSizeOnWire() > 0 {
369+
totalSentOnWire++
370+
}
371+
})
372+
373+
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension)
374+
375+
blockChain.VerifyWholeChain(ctx, progressChan)
376+
testutil.VerifyEmptyErrors(ctx, t, errChan)
377+
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")
378+
379+
require.Equal(t, blockChainLength, totalSent)
380+
require.Equal(t, blockChainLength-50, totalSentOnWire)
381+
}
382+
334383
func TestPauseResume(t *testing.T) {
335384
// create network
336385
ctx := context.Background()

responsemanager/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ type NetworkErrorListeners interface {
126126
type ResponseAssembler interface {
127127
DedupKey(p peer.ID, requestID graphsync.RequestID, key string)
128128
IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link)
129+
SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipCount int64)
129130
Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error
130131
}
131132

responsemanager/querypreparer.go

+22
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/ipfs/go-graphsync"
1313
"github.com/ipfs/go-graphsync/cidset"
1414
"github.com/ipfs/go-graphsync/dedupkey"
15+
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
1516
"github.com/ipfs/go-graphsync/ipldutil"
1617
gsmsg "github.com/ipfs/go-graphsync/message"
1718
"github.com/ipfs/go-graphsync/notifications"
@@ -62,6 +63,9 @@ func (qe *queryPreparer) prepareQuery(ctx context.Context,
6263
if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil {
6364
return nil, nil, false, err
6465
}
66+
if err := qe.processDoNotSendFirstBlocks(request, p, failNotifee); err != nil {
67+
return nil, nil, false, err
68+
}
6569
rootLink := cidlink.Link{Cid: request.Root()}
6670
linkSystem := result.CustomLinkSystem
6771
if linkSystem.StorageReadOpener == nil {
@@ -120,3 +124,21 @@ func (qe *queryPreparer) processDoNoSendCids(request gsmsg.GraphSyncRequest, p p
120124
qe.responseAssembler.IgnoreBlocks(p, request.ID(), links)
121125
return nil
122126
}
127+
128+
func (qe *queryPreparer) processDoNotSendFirstBlocks(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
129+
doNotSendFirstBlocksData, has := request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
130+
if !has {
131+
return nil
132+
}
133+
skipCount, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
134+
if err != nil {
135+
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
136+
rb.FinishWithError(graphsync.RequestFailedUnknown)
137+
rb.AddNotifee(failNotifee)
138+
return nil
139+
})
140+
return err
141+
}
142+
qe.responseAssembler.SkipFirstBlocks(p, request.ID(), skipCount)
143+
return nil
144+
}

responsemanager/responseassembler/peerlinktracker.go

+25-11
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,21 @@ import (
1010
)
1111

1212
type peerLinkTracker struct {
13-
linkTrackerLk sync.RWMutex
14-
linkTracker *linktracker.LinkTracker
15-
altTrackers map[string]*linktracker.LinkTracker
16-
dedupKeys map[graphsync.RequestID]string
13+
linkTrackerLk sync.RWMutex
14+
linkTracker *linktracker.LinkTracker
15+
altTrackers map[string]*linktracker.LinkTracker
16+
dedupKeys map[graphsync.RequestID]string
17+
blockSentCount map[graphsync.RequestID]int64
18+
skipFirstBlocks map[graphsync.RequestID]int64
1719
}
1820

1921
func newTracker() *peerLinkTracker {
2022
return &peerLinkTracker{
21-
linkTracker: linktracker.New(),
22-
dedupKeys: make(map[graphsync.RequestID]string),
23-
altTrackers: make(map[string]*linktracker.LinkTracker),
23+
linkTracker: linktracker.New(),
24+
dedupKeys: make(map[graphsync.RequestID]string),
25+
altTrackers: make(map[string]*linktracker.LinkTracker),
26+
blockSentCount: make(map[graphsync.RequestID]int64),
27+
skipFirstBlocks: make(map[graphsync.RequestID]int64),
2428
}
2529
}
2630

@@ -54,6 +58,12 @@ func (prs *peerLinkTracker) IgnoreBlocks(requestID graphsync.RequestID, links []
5458
prs.linkTrackerLk.Unlock()
5559
}
5660

61+
func (prs *peerLinkTracker) SkipFirstBlocks(requestID graphsync.RequestID, blocksToSkip int64) {
62+
prs.linkTrackerLk.Lock()
63+
prs.skipFirstBlocks[requestID] = blocksToSkip
64+
prs.linkTrackerLk.Unlock()
65+
}
66+
5767
// FinishTracking clears link tracking data for the request.
5868
func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool {
5969
prs.linkTrackerLk.Lock()
@@ -74,16 +84,20 @@ func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool {
7484
delete(prs.altTrackers, key)
7585
}
7686
}
87+
delete(prs.blockSentCount, requestID)
88+
delete(prs.skipFirstBlocks, requestID)
7789
return allBlocks
7890
}
7991

8092
// RecordLinkTraversal records whether a link is found for a request.
8193
func (prs *peerLinkTracker) RecordLinkTraversal(requestID graphsync.RequestID,
82-
link ipld.Link, hasBlock bool) (isUnique bool) {
94+
link ipld.Link, hasBlock bool) bool {
8395
prs.linkTrackerLk.Lock()
96+
defer prs.linkTrackerLk.Unlock()
97+
prs.blockSentCount[requestID]++
98+
notSkipped := prs.skipFirstBlocks[requestID] < prs.blockSentCount[requestID]
8499
linkTracker := prs.getLinkTracker(requestID)
85-
isUnique = linkTracker.BlockRefCount(link) == 0
100+
isUnique := linkTracker.BlockRefCount(link) == 0
86101
linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
87-
prs.linkTrackerLk.Unlock()
88-
return
102+
return hasBlock && notSkipped && isUnique
89103
}

responsemanager/responseassembler/responseBuilder.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ func (rb *responseBuilder) AddNotifee(notifee notifications.Notifee) {
6060
func (rb *responseBuilder) setupBlockOperation(
6161
link ipld.Link, data []byte) blockOperation {
6262
hasBlock := data != nil
63-
isUnique := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock)
63+
send := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock)
6464
return blockOperation{
65-
data, hasBlock && isUnique, link, rb.requestID,
65+
data, send, link, rb.requestID,
6666
}
6767
}
6868

responsemanager/responseassembler/responseassembler.go

+5
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ func (ra *ResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.Request
8686
ra.GetProcess(p).(*peerLinkTracker).IgnoreBlocks(requestID, links)
8787
}
8888

89+
// SkipFirstBlocks tells the assembler for the given request to not send the first N blocks
90+
func (ra *ResponseAssembler) SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipFirstBlocks int64) {
91+
ra.GetProcess(p).(*peerLinkTracker).SkipFirstBlocks(requestID, skipFirstBlocks)
92+
}
93+
8994
// Transaction builds a response, and queues it for sending in the next outgoing message
9095
func (ra *ResponseAssembler) Transaction(p peer.ID, requestID graphsync.RequestID, transaction Transaction) error {
9196
rb := &responseBuilder{

responsemanager/responseassembler/responseassembler_test.go

+78
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,84 @@ func TestResponseAssemblerIgnoreBlocks(t *testing.T) {
259259

260260
}
261261

262+
func TestResponseAssemblerSkipFirstBlocks(t *testing.T) {
263+
ctx := context.Background()
264+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
265+
defer cancel()
266+
p := testutil.GeneratePeers(1)[0]
267+
requestID1 := graphsync.RequestID(rand.Int31())
268+
requestID2 := graphsync.RequestID(rand.Int31())
269+
blks := testutil.GenerateBlocksOfSize(5, 100)
270+
links := make([]ipld.Link, 0, len(blks))
271+
for _, block := range blks {
272+
links = append(links, cidlink.Link{Cid: block.Cid()})
273+
}
274+
fph := newFakePeerHandler(ctx, t)
275+
responseAssembler := New(ctx, fph)
276+
277+
responseAssembler.SkipFirstBlocks(p, requestID1, 3)
278+
279+
var bd1, bd2, bd3, bd4, bd5 graphsync.BlockData
280+
err := responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
281+
bd1 = b.SendResponse(links[0], blks[0].RawData())
282+
return nil
283+
})
284+
require.NoError(t, err)
285+
286+
assertSentNotOnWire(t, bd1, blks[0])
287+
fph.RefuteBlocks()
288+
fph.AssertResponses(expectedResponses{requestID1: graphsync.PartialResponse})
289+
290+
err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error {
291+
bd1 = b.SendResponse(links[0], blks[0].RawData())
292+
return nil
293+
})
294+
require.NoError(t, err)
295+
fph.AssertResponses(expectedResponses{
296+
requestID2: graphsync.PartialResponse,
297+
})
298+
299+
err = responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
300+
bd2 = b.SendResponse(links[1], blks[1].RawData())
301+
bd3 = b.SendResponse(links[2], blks[2].RawData())
302+
return nil
303+
})
304+
require.NoError(t, err)
305+
306+
assertSentNotOnWire(t, bd1, blks[0])
307+
assertSentNotOnWire(t, bd2, blks[1])
308+
assertSentNotOnWire(t, bd3, blks[2])
309+
310+
fph.RefuteBlocks()
311+
fph.AssertResponses(expectedResponses{
312+
requestID1: graphsync.PartialResponse,
313+
})
314+
err = responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
315+
bd4 = b.SendResponse(links[3], blks[3].RawData())
316+
bd5 = b.SendResponse(links[4], blks[4].RawData())
317+
b.FinishRequest()
318+
return nil
319+
})
320+
require.NoError(t, err)
321+
322+
assertSentOnWire(t, bd4, blks[3])
323+
assertSentOnWire(t, bd5, blks[4])
324+
325+
fph.AssertBlocks(blks[3], blks[4])
326+
fph.AssertResponses(expectedResponses{requestID1: graphsync.RequestCompletedFull})
327+
328+
err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error {
329+
b.SendResponse(links[3], blks[3].RawData())
330+
b.FinishRequest()
331+
return nil
332+
})
333+
require.NoError(t, err)
334+
335+
fph.AssertBlocks(blks[3])
336+
fph.AssertResponses(expectedResponses{requestID2: graphsync.RequestCompletedFull})
337+
338+
}
339+
262340
func TestResponseAssemblerDupKeys(t *testing.T) {
263341
ctx := context.Background()
264342
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)

0 commit comments

Comments
 (0)