Skip to content

Commit 4aaf24f

Browse files
Merge pull request #4658 from ipfs/fix/session-cleanup
shutdown notifications engine when closing a bitswap session
2 parents 3461622 + 0dd0f25 commit 4aaf24f

File tree

6 files changed

+175
-14
lines changed

6 files changed

+175
-14
lines changed

exchange/bitswap/message/message.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type Exportable interface {
5050

5151
type impl struct {
5252
full bool
53-
wantlist map[string]Entry
53+
wantlist map[string]*Entry
5454
blocks map[string]blocks.Block
5555
}
5656

@@ -61,7 +61,7 @@ func New(full bool) BitSwapMessage {
6161
func newMsg(full bool) *impl {
6262
return &impl{
6363
blocks: make(map[string]blocks.Block),
64-
wantlist: make(map[string]Entry),
64+
wantlist: make(map[string]*Entry),
6565
full: full,
6666
}
6767
}
@@ -122,7 +122,7 @@ func (m *impl) Empty() bool {
122122
func (m *impl) Wantlist() []Entry {
123123
out := make([]Entry, 0, len(m.wantlist))
124124
for _, e := range m.wantlist {
125-
out = append(out, e)
125+
out = append(out, *e)
126126
}
127127
return out
128128
}
@@ -151,7 +151,7 @@ func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) {
151151
e.Priority = priority
152152
e.Cancel = cancel
153153
} else {
154-
m.wantlist[k] = Entry{
154+
m.wantlist[k] = &Entry{
155155
Entry: &wantlist.Entry{
156156
Cid: c,
157157
Priority: priority,

exchange/bitswap/notifications/notifications.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package notifications
22

33
import (
44
"context"
5+
"sync"
56

67
blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
78

@@ -18,18 +19,43 @@ type PubSub interface {
1819
}
1920

2021
func New() PubSub {
21-
return &impl{*pubsub.New(bufferSize)}
22+
return &impl{
23+
wrapped: *pubsub.New(bufferSize),
24+
cancel: make(chan struct{}),
25+
}
2226
}
2327

2428
type impl struct {
2529
wrapped pubsub.PubSub
30+
31+
// These two fields make up a shutdown "lock".
32+
// We need them as calling, e.g., `Unsubscribe` after calling `Shutdown`
33+
// blocks forever and fixing this in pubsub would be rather invasive.
34+
cancel chan struct{}
35+
wg sync.WaitGroup
2636
}
2737

2838
func (ps *impl) Publish(block blocks.Block) {
39+
ps.wg.Add(1)
40+
defer ps.wg.Done()
41+
42+
select {
43+
case <-ps.cancel:
44+
// Already shutdown, bail.
45+
return
46+
default:
47+
}
48+
2949
ps.wrapped.Pub(block, block.Cid().KeyString())
3050
}
3151

52+
// Not safe to call more than once.
3253
func (ps *impl) Shutdown() {
54+
// Interrupt in-progress subscriptions.
55+
close(ps.cancel)
56+
// Wait for them to finish.
57+
ps.wg.Wait()
58+
// shutdown the pubsub.
3359
ps.wrapped.Shutdown()
3460
}
3561

@@ -44,12 +70,34 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.B
4470
close(blocksCh)
4571
return blocksCh
4672
}
73+
74+
// prevent shutdown
75+
ps.wg.Add(1)
76+
77+
// check if shutdown *after* preventing shutdowns.
78+
select {
79+
case <-ps.cancel:
80+
// abort, allow shutdown to continue.
81+
ps.wg.Done()
82+
close(blocksCh)
83+
return blocksCh
84+
default:
85+
}
86+
4787
ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
4888
go func() {
49-
defer close(blocksCh)
50-
defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization
89+
defer func() {
90+
ps.wrapped.Unsub(valuesCh)
91+
close(blocksCh)
92+
93+
// Unblock shutdown.
94+
ps.wg.Done()
95+
}()
96+
5197
for {
5298
select {
99+
case <-ps.cancel:
100+
return
53101
case <-ctx.Done():
54102
return
55103
case val, ok := <-valuesCh:
@@ -61,6 +109,8 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.B
61109
return
62110
}
63111
select {
112+
case <-ps.cancel:
113+
return
64114
case <-ctx.Done():
65115
return
66116
case blocksCh <- block: // continue

exchange/bitswap/notifications/notifications_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,25 @@ func TestDuplicateSubscribe(t *testing.T) {
100100
assertBlocksEqual(t, e1, r2)
101101
}
102102

103+
func TestShutdownBeforeUnsubscribe(t *testing.T) {
104+
e1 := blocks.NewBlock([]byte("1"))
105+
106+
n := New()
107+
ctx, cancel := context.WithCancel(context.Background())
108+
ch := n.Subscribe(ctx, e1.Cid()) // no keys provided
109+
n.Shutdown()
110+
cancel()
111+
112+
select {
113+
case _, ok := <-ch:
114+
if ok {
115+
t.Fatal("channel should have been closed")
116+
}
117+
default:
118+
t.Fatal("channel should have been closed")
119+
}
120+
}
121+
103122
func TestSubscribeIsANoopWhenCalledWithNoKeys(t *testing.T) {
104123
n := New()
105124
defer n.Shutdown()

exchange/bitswap/session.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,15 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session {
8383
}
8484

8585
func (bs *Bitswap) removeSession(s *Session) {
86+
s.notif.Shutdown()
87+
88+
live := make([]*cid.Cid, 0, len(s.liveWants))
89+
for c := range s.liveWants {
90+
cs, _ := cid.Cast([]byte(c))
91+
live = append(live, cs)
92+
}
93+
bs.CancelWants(live, s.id)
94+
8695
bs.sessLk.Lock()
8796
defer bs.sessLk.Unlock()
8897
for i := 0; i < len(bs.sessions); i++ {
@@ -270,8 +279,9 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
270279
}
271280

272281
func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) {
282+
now := time.Now()
273283
for _, c := range ks {
274-
s.liveWants[c.KeyString()] = time.Now()
284+
s.liveWants[c.KeyString()] = now
275285
}
276286
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
277287
}

exchange/bitswap/session_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,3 +285,36 @@ func TestMultipleSessions(t *testing.T) {
285285
}
286286
_ = blkch
287287
}
288+
289+
func TestWantlistClearsOnCancel(t *testing.T) {
290+
ctx, cancel := context.WithCancel(context.Background())
291+
defer cancel()
292+
293+
vnet := getVirtualNetwork()
294+
sesgen := NewTestSessionGenerator(vnet)
295+
defer sesgen.Close()
296+
bgen := blocksutil.NewBlockGenerator()
297+
298+
blks := bgen.Blocks(10)
299+
var cids []*cid.Cid
300+
for _, blk := range blks {
301+
cids = append(cids, blk.Cid())
302+
}
303+
304+
inst := sesgen.Instances(1)
305+
306+
a := inst[0]
307+
308+
ctx1, cancel1 := context.WithCancel(ctx)
309+
ses := a.Exchange.NewSession(ctx1)
310+
311+
_, err := ses.GetBlocks(ctx, cids)
312+
if err != nil {
313+
t.Fatal(err)
314+
}
315+
cancel1()
316+
317+
if len(a.Exchange.GetWantlist()) > 0 {
318+
t.Fatal("expected empty wantlist")
319+
}
320+
}

exchange/bitswap/testnet/virtual.go

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"sync"
7+
"time"
78

89
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
910
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
@@ -22,7 +23,7 @@ var log = logging.Logger("bstestnet")
2223

2324
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
2425
return &network{
25-
clients: make(map[peer.ID]bsnet.Receiver),
26+
clients: make(map[peer.ID]*receiverQueue),
2627
delay: d,
2728
routingserver: rs,
2829
conns: make(map[string]struct{}),
@@ -31,12 +32,28 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
3132

3233
type network struct {
3334
mu sync.Mutex
34-
clients map[peer.ID]bsnet.Receiver
35+
clients map[peer.ID]*receiverQueue
3536
routingserver mockrouting.Server
3637
delay delay.D
3738
conns map[string]struct{}
3839
}
3940

41+
type message struct {
42+
from peer.ID
43+
msg bsmsg.BitSwapMessage
44+
shouldSend time.Time
45+
}
46+
47+
// receiverQueue queues up a set of messages to be sent, and sends them *in
48+
// order* with their delays respected as much as sending them in order allows
49+
// for
50+
type receiverQueue struct {
51+
receiver bsnet.Receiver
52+
queue []*message
53+
active bool
54+
lk sync.Mutex
55+
}
56+
4057
func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork {
4158
n.mu.Lock()
4259
defer n.mu.Unlock()
@@ -46,7 +63,7 @@ func (n *network) Adapter(p testutil.Identity) bsnet.BitSwapNetwork {
4663
network: n,
4764
routing: n.routingserver.Client(p),
4865
}
49-
n.clients[p.ID()] = client
66+
n.clients[p.ID()] = &receiverQueue{receiver: client}
5067
return client
5168
}
5269

@@ -64,7 +81,7 @@ func (n *network) SendMessage(
6481
ctx context.Context,
6582
from peer.ID,
6683
to peer.ID,
67-
message bsmsg.BitSwapMessage) error {
84+
mes bsmsg.BitSwapMessage) error {
6885

6986
n.mu.Lock()
7087
defer n.mu.Unlock()
@@ -77,7 +94,12 @@ func (n *network) SendMessage(
7794
// nb: terminate the context since the context wouldn't actually be passed
7895
// over the network in a real scenario
7996

80-
go n.deliver(receiver, from, message)
97+
msg := &message{
98+
from: from,
99+
msg: mes,
100+
shouldSend: time.Now().Add(n.delay.Get()),
101+
}
102+
receiver.enqueue(msg)
81103

82104
return nil
83105
}
@@ -191,11 +213,38 @@ func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error {
191213

192214
// TODO: add handling for disconnects
193215

194-
otherClient.PeerConnected(nc.local)
216+
otherClient.receiver.PeerConnected(nc.local)
195217
nc.Receiver.PeerConnected(p)
196218
return nil
197219
}
198220

221+
func (rq *receiverQueue) enqueue(m *message) {
222+
rq.lk.Lock()
223+
defer rq.lk.Unlock()
224+
rq.queue = append(rq.queue, m)
225+
if !rq.active {
226+
rq.active = true
227+
go rq.process()
228+
}
229+
}
230+
231+
func (rq *receiverQueue) process() {
232+
for {
233+
rq.lk.Lock()
234+
if len(rq.queue) == 0 {
235+
rq.active = false
236+
rq.lk.Unlock()
237+
return
238+
}
239+
m := rq.queue[0]
240+
rq.queue = rq.queue[1:]
241+
rq.lk.Unlock()
242+
243+
time.Sleep(time.Until(m.shouldSend))
244+
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
245+
}
246+
}
247+
199248
func tagForPeers(a, b peer.ID) string {
200249
if a < b {
201250
return string(a + b)

0 commit comments

Comments
 (0)