Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit 3e5e09c

Browse files
committed
fix: don't ignore received blocks for pending wants
1 parent e72b289 commit 3e5e09c

File tree

5 files changed

+116
-36
lines changed

5 files changed

+116
-36
lines changed

bitswap.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
265265
// HasBlock announces the existence of a block to this bitswap service. The
266266
// service will potentially notify its peers.
267267
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
268-
return bs.receiveBlocksFrom("", []blocks.Block{blk})
268+
return bs.receiveBlocksFrom(nil, "", []blocks.Block{blk})
269269
}
270270

271271
// TODO: Some of this stuff really only needs to be done when adding a block
272272
// from the user, not when receiving it from the network.
273273
// In case you run `git blame` on this comment, I'll save you some time: ask
274274
// @whyrusleeping, I don't know the answers you seek.
275-
func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
275+
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block) error {
276276
select {
277277
case <-bs.process.Closing():
278278
return errors.New("bitswap is closed")
@@ -286,7 +286,7 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
286286
// Split blocks into wanted blocks vs duplicates
287287
wanted = make([]blocks.Block, 0, len(blks))
288288
for _, b := range blks {
289-
if bs.wm.IsWanted(b.Cid()) {
289+
if bs.sm.InterestedIn(b.Cid()) {
290290
wanted = append(wanted, b)
291291
} else {
292292
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
@@ -326,6 +326,12 @@ func (bs *Bitswap) receiveBlocksFrom(from peer.ID, blks []blocks.Block) error {
326326
}
327327
}
328328

329+
if from != "" {
330+
for _, b := range wanted {
331+
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
332+
}
333+
}
334+
329335
return nil
330336
}
331337

@@ -354,17 +360,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
354360
}
355361

356362
// Process blocks
357-
err := bs.receiveBlocksFrom(p, iblocks)
363+
err := bs.receiveBlocksFrom(ctx, p, iblocks)
358364
if err != nil {
359365
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
360366
return
361367
}
362-
363-
for _, b := range iblocks {
364-
if bs.wm.IsWanted(b.Cid()) {
365-
log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
366-
}
367-
}
368368
}
369369

370370
func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {

bitswap_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
2222
delay "github.com/ipfs/go-ipfs-delay"
2323
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
24+
peer "github.com/libp2p/go-libp2p-core/peer"
2425
p2ptestutil "github.com/libp2p/go-libp2p-netutil"
2526
travis "github.com/libp2p/go-libp2p-testing/ci/travis"
2627
tu "github.com/libp2p/go-libp2p-testing/etc"
@@ -132,6 +133,8 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
132133
}
133134
}
134135

136+
// Tests that a received block is not stored in the blockstore if the block was
137+
// not requested by the client
135138
func TestUnwantedBlockNotAdded(t *testing.T) {
136139

137140
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
@@ -164,6 +167,68 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
164167
}
165168
}
166169

170+
// Tests that a received block is returned to the client and stored in the
171+
// blockstore in the following scenario:
172+
// - the want for the block has been requested by the client
173+
// - the want for the block has not yet been sent out to a peer
174+
// (because the live request queue is full)
175+
func TestPendingBlockAdded(t *testing.T) {
176+
ctx := context.Background()
177+
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
178+
bg := blocksutil.NewBlockGenerator()
179+
sessionBroadcastWantCapacity := 4
180+
181+
ig := testinstance.NewTestInstanceGenerator(net)
182+
defer ig.Close()
183+
184+
instance := ig.Instances(1)[0]
185+
defer instance.Exchange.Close()
186+
187+
oneSecCtx, cancel := context.WithTimeout(context.Background(), time.Second)
188+
defer cancel()
189+
190+
// Request enough blocks to exceed the session's broadcast want list
191+
// capacity (by one block). The session will put the remaining block
192+
// into the "tofetch" queue
193+
blks := bg.Blocks(sessionBroadcastWantCapacity + 1)
194+
ks := make([]cid.Cid, 0, len(blks))
195+
for _, b := range blks {
196+
ks = append(ks, b.Cid())
197+
}
198+
outch, err := instance.Exchange.GetBlocks(ctx, ks)
199+
if err != nil {
200+
t.Fatal(err)
201+
}
202+
203+
// Wait a little while to make sure the session has time to process the wants
204+
time.Sleep(time.Millisecond * 20)
205+
206+
// Simulate receiving a message which contains the block in the "tofetch" queue
207+
lastBlock := blks[len(blks)-1]
208+
bsMessage := message.New(true)
209+
bsMessage.AddBlock(lastBlock)
210+
unknownPeer := peer.ID("QmUHfvCQrzyR6vFXmeyCptfCWedfcmfa12V6UuziDtrw23")
211+
instance.Exchange.ReceiveMessage(oneSecCtx, unknownPeer, bsMessage)
212+
213+
// Make sure Bitswap adds the block to the output channel
214+
blkrecvd, ok := <-outch
215+
if !ok {
216+
t.Fatal("timed out waiting for block")
217+
}
218+
if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
219+
t.Fatal("received wrong block")
220+
}
221+
222+
// Make sure Bitswap adds the block to the blockstore
223+
blockInStore, err := instance.Blockstore().Has(lastBlock.Cid())
224+
if err != nil {
225+
t.Fatal(err)
226+
}
227+
if !blockInStore {
228+
t.Fatal("Block was not added to block store")
229+
}
230+
}
231+
167232
func TestLargeSwarm(t *testing.T) {
168233
if testing.Short() {
169234
t.SkipNow()

sessionmanager/sessionmanager.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,17 @@ func (sm *SessionManager) ReceiveBlocksFrom(from peer.ID, blks []blocks.Block) {
128128
s.session.ReceiveBlocksFrom(from, sessBlks)
129129
}
130130
}
131+
132+
// InterestedIn indicates whether any of the sessions are waiting to receive
133+
// the block with the given CID.
134+
func (sm *SessionManager) InterestedIn(cid cid.Cid) bool {
135+
sm.sessLk.Lock()
136+
defer sm.sessLk.Unlock()
137+
138+
for _, s := range sm.sessions {
139+
if s.session.InterestedIn(cid) {
140+
return true
141+
}
142+
}
143+
return false
144+
}

sessionmanager/sessionmanager_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,33 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
175175
}
176176
}
177177

178+
func TestInterestedIn(t *testing.T) {
179+
ctx := context.Background()
180+
ctx, cancel := context.WithCancel(ctx)
181+
defer cancel()
182+
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
183+
184+
blks := testutil.GenerateBlocksOfSize(4, 1024)
185+
var cids []cid.Cid
186+
for _, b := range blks {
187+
cids = append(cids, b.Cid())
188+
}
189+
190+
nextInterestedIn = []cid.Cid{cids[0], cids[1]}
191+
_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
192+
nextInterestedIn = []cid.Cid{cids[0], cids[2]}
193+
_ = sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
194+
195+
if !sm.InterestedIn(cids[0]) ||
196+
!sm.InterestedIn(cids[1]) ||
197+
!sm.InterestedIn(cids[2]) {
198+
t.Fatal("expected interest but session manager was not interested")
199+
}
200+
if sm.InterestedIn(cids[3]) {
201+
t.Fatal("expected no interest but session manager was interested")
202+
}
203+
}
204+
178205
func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
179206
ctx := context.Background()
180207
ctx, cancel := context.WithCancel(ctx)

wantmanager/wantmanager.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -80,22 +80,6 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe
8080
wm.addEntries(context.Background(), ks, peers, true, ses)
8181
}
8282

83-
// IsWanted returns whether a CID is currently wanted.
84-
func (wm *WantManager) IsWanted(c cid.Cid) bool {
85-
resp := make(chan bool, 1)
86-
select {
87-
case wm.wantMessages <- &isWantedMessage{c, resp}:
88-
case <-wm.ctx.Done():
89-
return false
90-
}
91-
select {
92-
case wanted := <-resp:
93-
return wanted
94-
case <-wm.ctx.Done():
95-
return false
96-
}
97-
}
98-
9983
// CurrentWants returns the list of current wants.
10084
func (wm *WantManager) CurrentWants() []wantlist.Entry {
10185
resp := make(chan []wantlist.Entry, 1)
@@ -232,16 +216,6 @@ func (ws *wantSet) handle(wm *WantManager) {
232216
wm.peerHandler.SendMessage(ws.entries, ws.targets, ws.from)
233217
}
234218

235-
type isWantedMessage struct {
236-
c cid.Cid
237-
resp chan<- bool
238-
}
239-
240-
func (iwm *isWantedMessage) handle(wm *WantManager) {
241-
_, isWanted := wm.wl.Contains(iwm.c)
242-
iwm.resp <- isWanted
243-
}
244-
245219
type currentWantsMessage struct {
246220
resp chan<- []wantlist.Entry
247221
}

0 commit comments

Comments
 (0)