@@ -159,6 +159,8 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
159
159
go func () {
160
160
txs := make (chan []* types.Transaction , 128 )
161
161
pendingTxSub := api .events .SubscribePendingTxs (txs )
162
+ defer pendingTxSub .Unsubscribe ()
163
+
162
164
chainConfig := api .sys .backend .ChainConfig ()
163
165
164
166
for {
@@ -176,10 +178,8 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
176
178
}
177
179
}
178
180
case <- rpcSub .Err ():
179
- pendingTxSub .Unsubscribe ()
180
181
return
181
182
case <- notifier .Closed ():
182
- pendingTxSub .Unsubscribe ()
183
183
return
184
184
}
185
185
}
@@ -233,16 +233,15 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
233
233
go func () {
234
234
headers := make (chan * types.Header )
235
235
headersSub := api .events .SubscribeNewHeads (headers )
236
+ defer headersSub .Unsubscribe ()
236
237
237
238
for {
238
239
select {
239
240
case h := <- headers :
240
241
notifier .Notify (rpcSub .ID , h )
241
242
case <- rpcSub .Err ():
242
- headersSub .Unsubscribe ()
243
243
return
244
244
case <- notifier .Closed ():
245
- headersSub .Unsubscribe ()
246
245
return
247
246
}
248
247
}
@@ -267,6 +266,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
267
266
if err != nil {
268
267
return nil , err
269
268
}
269
+ defer logsSub .Unsubscribe ()
270
270
271
271
go func () {
272
272
for {
@@ -277,10 +277,8 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc
277
277
notifier .Notify (rpcSub .ID , & log )
278
278
}
279
279
case <- rpcSub .Err (): // client send an unsubscribe request
280
- logsSub .Unsubscribe ()
281
280
return
282
281
case <- notifier .Closed (): // connection dropped
283
- logsSub .Unsubscribe ()
284
282
return
285
283
}
286
284
}
0 commit comments