@@ -20,6 +20,7 @@ import (
20
20
type PubSubAPI CoreAPI
21
21
22
22
type pubSubSubscription struct {
23
+ cancel context.CancelFunc
23
24
subscription * floodsub.Subscription
24
25
}
25
26
@@ -75,27 +76,29 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
75
76
return nil , err
76
77
}
77
78
79
+ pubctx , cancel := context .WithCancel (api .node .Context ())
80
+
78
81
if options .Discover {
79
82
go func () {
80
- blk , err := api .core ().Block ().Put (ctx , strings .NewReader ("floodsub:" + topic ))
83
+ blk , err := api .core ().Block ().Put (pubctx , strings .NewReader ("floodsub:" + topic ))
81
84
if err != nil {
82
85
log .Error ("pubsub discovery: " , err )
83
86
return
84
87
}
85
88
86
- connectToPubSubPeers (ctx , api .node , blk .Path ().Cid ())
89
+ connectToPubSubPeers (pubctx , api .node , blk .Path ().Cid ())
87
90
}()
88
91
}
89
92
90
- return & pubSubSubscription {sub }, nil
93
+ return & pubSubSubscription {cancel , sub }, nil
91
94
}
92
95
93
96
func connectToPubSubPeers (ctx context.Context , n * core.IpfsNode , cid cid.Cid ) {
94
97
ctx , cancel := context .WithCancel (ctx )
95
98
defer cancel ()
96
99
97
100
provs := n .Routing .FindProvidersAsync (ctx , cid , 10 )
98
- wg := & sync.WaitGroup {}
101
+ var wg sync.WaitGroup
99
102
for p := range provs {
100
103
wg .Add (1 )
101
104
go func (pi pstore.PeerInfo ) {
@@ -127,6 +130,7 @@ func (api *PubSubAPI) checkNode() error {
127
130
}
128
131
129
132
func (sub * pubSubSubscription ) Close () error {
133
+ sub .cancel ()
130
134
sub .subscription .Cancel ()
131
135
return nil
132
136
}
0 commit comments