4
4
"context"
5
5
"errors"
6
6
"math"
7
+ "time"
7
8
8
9
"github.com/ipfs/go-peertaskqueue/peertask"
9
10
"github.com/libp2p/go-libp2p-core/peer"
@@ -155,6 +156,14 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
155
156
networkErrorListeners : rm .networkErrorListeners ,
156
157
connManager : rm .connManager ,
157
158
})
159
+ log .Infow ("graphsync request initiated" , "request id" , request .ID (), "peer" , p , "root" , request .Root ())
160
+ ipr , ok := rm .inProgressResponses [key ]
161
+ if ok && ipr .state == graphsync .Running {
162
+ // if we replace a running request, the signals and context will no longer work, meaning the request could get
163
+ // stuck indefinitely
164
+ log .Infof ("there is an identical request already in progress" , "request id" , request .ID (), "peer" , p )
165
+ continue
166
+ }
158
167
159
168
rm .inProgressResponses [key ] =
160
169
& inProgressResponseStatus {
@@ -167,7 +176,8 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
167
176
UpdateSignal : make (chan struct {}, 1 ),
168
177
ErrSignal : make (chan error , 1 ),
169
178
},
170
- state : graphsync .Queued ,
179
+ state : graphsync .Queued ,
180
+ startTime : time .Now (),
171
181
}
172
182
// TODO: Use a better work estimation metric.
173
183
@@ -180,6 +190,8 @@ func (rm *ResponseManager) taskDataForKey(key responseKey) queryexecutor.Respons
180
190
if ! hasResponse {
181
191
return queryexecutor.ResponseTask {Empty : true }
182
192
}
193
+ log .Infow ("graphsync response processing begins" , "request id" , key .requestID , "peer" , key .p , "total time" , time .Since (response .startTime ))
194
+
183
195
if response .loader == nil || response .traverser == nil {
184
196
loader , traverser , isPaused , err := (& queryPreparer {rm .requestHooks , rm .responseAssembler , rm .linkSystem , rm .maxLinksPerRequest }).prepareQuery (response .ctx , key .p , response .request , response .signals , response .subscriber )
185
197
if err != nil {
@@ -212,6 +224,7 @@ func (rm *ResponseManager) startTask(task *peertask.Task) queryexecutor.Response
212
224
if taskData .Empty {
213
225
rm .responseQueue .TaskDone (key .p , task )
214
226
}
227
+
215
228
return taskData
216
229
}
217
230
@@ -226,6 +239,8 @@ func (rm *ResponseManager) finishTask(task *peertask.Task, err error) {
226
239
response .state = graphsync .Paused
227
240
return
228
241
}
242
+ log .Infow ("graphsync response processing complete (messages stil sending)" , "request id" , key .requestID , "peer" , key .p , "total time" , time .Since (response .startTime ))
243
+
229
244
if err != nil {
230
245
log .Infof ("response failed: %w" , err )
231
246
}
0 commit comments