Skip to content

Commit 7d6cafd

Browse files
committed
fix(responsemanager): fix network error propogation
fix various issues causing network errors not to propogate in many cases
1 parent 42f195e commit 7d6cafd

File tree

5 files changed

+46
-9
lines changed

5 files changed

+46
-9
lines changed

messagequeue/messagequeue.go

+8
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,14 @@ func (mq *MessageQueue) runQueue() {
118118
case <-mq.outgoingWork:
119119
mq.sendMessage()
120120
case <-mq.done:
121+
select {
122+
case <-mq.outgoingWork:
123+
message, topic := mq.extractOutgoingMessage()
124+
if message != nil || !message.Empty() {
125+
mq.eventPublisher.Publish(topic, Event{Name: Error, Err: fmt.Errorf("message queue shutdown")})
126+
}
127+
default:
128+
}
121129
if mq.sender != nil {
122130
mq.sender.Close()
123131
}

notifications/data_subscriber.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package notifications
22

3-
import "sync"
3+
import (
4+
"sync"
5+
)
46

57
type TopicDataSubscriber struct {
68
idMapLk sync.RWMutex
@@ -48,4 +50,7 @@ func (m *TopicDataSubscriber) OnClose(topic Topic) {
4850
for _, data := range m.getData(topic) {
4951
m.Subscriber.OnClose(data)
5052
}
53+
m.idMapLk.Lock()
54+
delete(m.data, topic)
55+
m.idMapLk.Unlock()
5156
}

peermanager/peermanager.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,18 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
8383
// GetProcess returns the process for the given peer
8484
func (pm *PeerManager) GetProcess(
8585
p peer.ID) PeerProcess {
86+
// Usually this this is just a read
87+
pm.peerProcessesLk.RLock()
88+
pqi, ok := pm.peerProcesses[p]
89+
if ok {
90+
pm.peerProcessesLk.RUnlock()
91+
return pqi.process
92+
}
93+
pm.peerProcessesLk.RUnlock()
94+
// but sometimes it involves a create (we still need to do get or create cause it's possible
95+
// another writer grabbed the Lock first and made the process)
8696
pm.peerProcessesLk.Lock()
87-
pqi := pm.getOrCreate(p)
97+
pqi = pm.getOrCreate(p)
8898
pm.peerProcessesLk.Unlock()
8999
return pqi.process
90100
}

responsemanager/peerresponsemanager/peerresponsesender.go

+16
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ type peerResponseSender struct {
7878
subscriber *notifications.TopicDataSubscriber
7979
allocatorSubscriber *notifications.TopicDataSubscriber
8080
publisher notifications.Publisher
81+
messagesSending sync.WaitGroup
8182
}
8283

8384
// PeerResponseSender handles batching, deduping, and sending responses for
@@ -442,13 +443,25 @@ func (prs *peerResponseSender) signalWork() {
442443

443444
func (prs *peerResponseSender) run() {
444445
defer func() {
446+
prs.messagesSending.Wait()
445447
prs.publisher.Shutdown()
446448
prs.allocator.ReleasePeerMemory(prs.p)
447449
}()
448450
prs.publisher.Startup()
449451
for {
450452
select {
451453
case <-prs.ctx.Done():
454+
select {
455+
case <-prs.outgoingWork:
456+
prs.responseBuildersLk.Lock()
457+
builders := prs.responseBuilders
458+
prs.responseBuilders = nil
459+
prs.responseBuildersLk.Unlock()
460+
for _, builder := range builders {
461+
prs.publisher.Publish(builder.Topic(), Event{Name: Error, Err: fmt.Errorf("queue shutdown")})
462+
}
463+
default:
464+
}
452465
return
453466
case <-prs.outgoingWork:
454467
prs.sendResponseMessages()
@@ -475,6 +488,7 @@ func (prs *peerResponseSender) sendResponseMessages() {
475488
log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
476489
}
477490

491+
prs.messagesSending.Add(1)
478492
prs.peerHandler.SendResponse(prs.p, responses, blks, notifications.Notifee{
479493
Data: builder.Topic(),
480494
Subscriber: prs.subscriber,
@@ -514,8 +528,10 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event
514528
switch msgEvent.Name {
515529
case messagequeue.Sent:
516530
s.prs.publisher.Publish(builderTopic, Event{Name: Sent})
531+
s.prs.messagesSending.Done()
517532
case messagequeue.Error:
518533
s.prs.publisher.Publish(builderTopic, Event{Name: Error, Err: fmt.Errorf("error sending message: %w", msgEvent.Err)})
534+
s.prs.messagesSending.Done()
519535
case messagequeue.Queued:
520536
select {
521537
case s.prs.queuedMessages <- builderTopic:

responsemanager/queryexecutor.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,10 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context,
114114
p peer.ID,
115115
request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
116116
result := qe.requestHooks.ProcessRequestHooks(p, request)
117-
peerResponseSender := qe.peerManager.SenderForPeer(p)
118117
var transactionError error
119118
var isPaused bool
120119
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
121-
err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
120+
err := qe.peerManager.SenderForPeer(p).Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
122121
for _, extension := range result.Extensions {
123122
transaction.SendExtensionData(extension)
124123
}
@@ -138,10 +137,10 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context,
138137
if transactionError != nil {
139138
return nil, nil, false, transactionError
140139
}
141-
if err := qe.processDedupByKey(request, peerResponseSender, failNotifee); err != nil {
140+
if err := qe.processDedupByKey(request, qe.peerManager.SenderForPeer(p), failNotifee); err != nil {
142141
return nil, nil, false, err
143142
}
144-
if err := qe.processDoNoSendCids(request, peerResponseSender, failNotifee); err != nil {
143+
if err := qe.processDoNoSendCids(request, qe.peerManager.SenderForPeer(p), failNotifee); err != nil {
145144
return nil, nil, false, err
146145
}
147146
rootLink := cidlink.Link{Cid: request.Root()}
@@ -201,10 +200,9 @@ func (qe *queryExecutor) executeQuery(
201200
signals signals,
202201
sub *notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) {
203202
updateChan := make(chan []gsmsg.GraphSyncRequest)
204-
peerResponseSender := qe.peerManager.SenderForPeer(p)
205203
err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
206204
var err error
207-
_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
205+
_ = qe.peerManager.SenderForPeer(p).Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
208206
err = qe.checkForUpdates(p, request, signals, updateChan, transaction)
209207
if _, ok := err.(hooks.ErrPaused); !ok && err != nil {
210208
return nil
@@ -228,7 +226,7 @@ func (qe *queryExecutor) executeQuery(
228226
return err
229227
})
230228
var code graphsync.ResponseStatusCode
231-
_ = peerResponseSender.Transaction(request.ID(), func(peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error {
229+
_ = qe.peerManager.SenderForPeer(p).Transaction(request.ID(), func(peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error {
232230
if err != nil {
233231
_, isPaused := err.(hooks.ErrPaused)
234232
if isPaused {

0 commit comments

Comments
 (0)