@@ -184,8 +184,8 @@ func TestRejectRequestsByDefault(t *testing.T) {
184
184
185
185
requestor := td .GraphSyncHost1 ()
186
186
// setup responder to disable default validation, meaning all requests are rejected
187
- _ = td .GraphSyncHost2 (RejectAllRequestsByDefault ())
188
-
187
+ responder : = td .GraphSyncHost2 (RejectAllRequestsByDefault ())
188
+ assertComplete := assertCompletionFunction ( responder , 1 )
189
189
blockChainLength := 5
190
190
blockChain := testutil .SetupBlockChain (ctx , t , td .persistence2 , 5 , blockChainLength )
191
191
@@ -196,6 +196,8 @@ func TestRejectRequestsByDefault(t *testing.T) {
196
196
testutil .VerifySingleTerminalError (ctx , t , errChan )
197
197
198
198
drain (requestor )
199
+ drain (responder )
200
+ assertComplete (ctx , t )
199
201
200
202
tracing := collectTracing (t )
201
203
require .ElementsMatch (t , []string {
@@ -229,7 +231,7 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
229
231
230
232
// initialize graphsync on second node to response to requests
231
233
responder := td .GraphSyncHost2 ()
232
-
234
+ assertCancelOrComplete := assertCancelOrCompleteFunction ( responder , 1 )
233
235
progressChan , errChan := requestor .Request (ctx , td .host2 .ID (), blockChain .TipLink , blockChain .Selector (), td .extension )
234
236
235
237
// response budgets don't include the root block, so total links traverse with be one more than expected
@@ -239,19 +241,21 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
239
241
240
242
drain (requestor )
241
243
drain (responder )
242
-
244
+ wasCancelled := assertCancelOrComplete ( ctx , t )
243
245
tracing := collectTracing (t )
244
246
traceStrings := tracing .TracesToStrings ()
245
- // TODO: this may or may not appear
246
247
require .Contains (t , traceStrings , "response(0)->executeTask(0)" )
247
- // may or may not contain one of these: "response(0)->abortRequest(0)"
248
- // TODO: figure out why and confirm this is OK
248
+ if wasCancelled {
249
+ require .Contains (t , traceStrings , "response(0)->abortRequest(0)" )
250
+ }
249
251
require .Contains (t , traceStrings , "request(0)->newRequest(0)" )
250
252
require .Contains (t , traceStrings , "request(0)->executeTask(0)" )
251
253
require .Contains (t , traceStrings , "request(0)->terminateRequest(0)" )
252
254
// has ErrBudgetExceeded exception recorded in the right place
253
255
tracing .SingleExceptionEvent (t , "request(0)->executeTask(0)" , "ErrBudgetExceeded" , "traversal budget exceeded" , true )
254
- tracing .SingleExceptionEvent (t , "response(0)->executeTask(0)" , "ContextCancelError" , ipldutil.ContextCancelError {}.Error (), true )
256
+ if wasCancelled {
257
+ tracing .SingleExceptionEvent (t , "response(0)->executeTask(0)" , "ContextCancelError" , ipldutil.ContextCancelError {}.Error (), true )
258
+ }
255
259
}
256
260
257
261
func TestGraphsyncRoundTripRequestBudgetResponder (t * testing.T ) {
@@ -529,7 +533,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
529
533
530
534
// initialize graphsync on second node to response to requests
531
535
responder := td .GraphSyncHost2 ()
532
-
536
+ assertComplete := assertCompletionFunction ( responder , 1 )
533
537
totalSent := 0
534
538
totalSentOnWire := 0
535
539
responder .RegisterOutgoingBlockHook (func (p peer.ID , requestData graphsync.RequestData , blockData graphsync.BlockData , hookActions graphsync.OutgoingBlockHookActions ) {
@@ -553,7 +557,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
553
557
554
558
drain (requestor )
555
559
drain (responder )
556
-
560
+ assertComplete ( ctx , t )
557
561
tracing := collectTracing (t )
558
562
require .ElementsMatch (t , []string {
559
563
"response(0)->executeTask(0)" ,
@@ -600,7 +604,7 @@ func TestPauseResume(t *testing.T) {
600
604
hookActions .TerminateWithError (errors .New ("should have sent extension" ))
601
605
}
602
606
})
603
-
607
+ assertOneRequestCompletes := assertCompletionFunction ( responder , 1 )
604
608
progressChan , errChan := requestor .Request (ctx , td .host2 .ID (), blockChain .TipLink , blockChain .Selector (), td .extension )
605
609
606
610
blockChain .VerifyResponseRange (ctx , progressChan , 0 , stopPoint )
@@ -636,12 +640,11 @@ func TestPauseResume(t *testing.T) {
636
640
637
641
drain (requestor )
638
642
drain (responder )
639
-
643
+ assertOneRequestCompletes ( ctx , t )
640
644
tracing := collectTracing (t )
641
645
traceStrings := tracing .TracesToStrings ()
642
646
require .Contains (t , traceStrings , "response(0)->executeTask(0)" )
643
- // may or may not contain an extra execute: "response(0)->executeTask(1)"
644
- // TODO: figure out why and confirm this is OK
647
+ require .Contains (t , traceStrings , "response(0)->executeTask(1)" )
645
648
require .Contains (t , traceStrings , "request(0)->newRequest(0)" )
646
649
require .Contains (t , traceStrings , "request(0)->executeTask(0)" )
647
650
require .Contains (t , traceStrings , "request(0)->terminateRequest(0)" )
@@ -668,6 +671,7 @@ func TestPauseResumeRequest(t *testing.T) {
668
671
669
672
// initialize graphsync on second node to response to requests
670
673
responder := td .GraphSyncHost2 ()
674
+ assertCancelOrComplete := assertCancelOrCompleteFunction (responder , 2 )
671
675
672
676
stopPoint := 50
673
677
blocksReceived := 0
@@ -699,14 +703,23 @@ func TestPauseResumeRequest(t *testing.T) {
699
703
700
704
drain (requestor )
701
705
drain (responder )
702
-
706
+ // the request may actually only get sent onces -- it depends
707
+ // on whether the responder completes its send before getting the cancel
708
+ // signal. even if the request pauses before the request is over,
709
+ // it may not make another graphsync request if it
710
+ // ingested the blocks into the temporary cache
711
+ wasCancelled := assertCancelOrComplete (ctx , t )
712
+ if wasCancelled {
713
+ // should get max 1 cancel
714
+ require .False (t , assertCancelOrComplete (ctx , t ))
715
+ }
703
716
tracing := collectTracing (t )
704
717
traceStrings := tracing .TracesToStrings ()
705
718
require .Contains (t , traceStrings , "response(0)->executeTask(0)" )
706
- // may or may not contain this: "response(0)->abortRequest(0)"
707
- // TODO: figure out why and confirm this is OK
708
- // may or may not contain an extra response+execute: "response(1)->executeTask(0)"
709
- // TODO: figure out why and confirm this is OK
719
+ if wasCancelled {
720
+ require . Contains ( t , traceStrings , "response(0)->abortRequest(0)" )
721
+ require . Contains ( t , traceStrings , "response(1)->executeTask(0)" )
722
+ }
710
723
require .Contains (t , traceStrings , "request(0)->newRequest(0)" )
711
724
require .Contains (t , traceStrings , "request(0)->executeTask(0)" )
712
725
require .Contains (t , traceStrings , "request(0)->executeTask(1)" )
@@ -766,6 +779,7 @@ func TestPauseResumeViaUpdate(t *testing.T) {
766
779
hookActions .UnpauseResponse ()
767
780
}
768
781
})
782
+ assertComplete := assertCompletionFunction (responder , 1 )
769
783
progressChan , errChan := requestor .Request (ctx , td .host2 .ID (), blockChain .TipLink , blockChain .Selector (), td .extension )
770
784
771
785
blockChain .VerifyWholeChain (ctx , progressChan )
@@ -777,6 +791,7 @@ func TestPauseResumeViaUpdate(t *testing.T) {
777
791
778
792
drain (requestor )
779
793
drain (responder )
794
+ assertComplete (ctx , t )
780
795
781
796
tracing := collectTracing (t )
782
797
// j, _ := json.MarshalIndent(tracing.FindSpans("executeTask"), "", " ")
@@ -852,6 +867,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
852
867
hookActions .UnpauseResponse ()
853
868
}
854
869
})
870
+ assertComplete := assertCompletionFunction (responder , 1 )
855
871
progressChan , errChan := requestor .Request (ctx , td .host2 .ID (), blockChain .TipLink , blockChain .Selector (), td .extension )
856
872
857
873
blockChain .VerifyWholeChain (ctx , progressChan )
@@ -863,6 +879,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
863
879
864
880
drain (requestor )
865
881
drain (responder )
882
+ assertComplete (ctx , t )
866
883
867
884
tracing := collectTracing (t )
868
885
require .ElementsMatch (t , []string {
@@ -962,7 +979,7 @@ func TestNetworkDisconnect(t *testing.T) {
962
979
traceStrings := tracing .TracesToStrings ()
963
980
require .Contains (t , traceStrings , "response(0)->executeTask(0)" )
964
981
// may contain multiple abortRequest traces as the network error can bubble up >1 times
965
- require . Contains ( t , traceStrings , "response(0)->abortRequest(0)" )
982
+ // but these will only record if the request is still executing
966
983
require .Contains (t , traceStrings , "request(0)->newRequest(0)" )
967
984
require .Contains (t , traceStrings , "request(0)->executeTask(0)" )
968
985
require .Contains (t , traceStrings , "request(0)->terminateRequest(0)" )
@@ -1032,6 +1049,7 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
1032
1049
1033
1050
// initialize graphsync on second node to response to requests
1034
1051
responder := td .GraphSyncHost2 ()
1052
+ assertComplete := assertCompletionFunction (responder , 2 )
1035
1053
1036
1054
// alternate storing location for responder
1037
1055
altStore1 := make (map [ipld.Link ][]byte )
@@ -1086,6 +1104,8 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
1086
1104
1087
1105
drain (requestor )
1088
1106
drain (responder )
1107
+ assertComplete (ctx , t )
1108
+ assertComplete (ctx , t )
1089
1109
1090
1110
tracing := collectTracing (t )
1091
1111
// two complete request traces expected
@@ -1117,6 +1137,7 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) {
1117
1137
1118
1138
// initialize graphsync on second node to response to requests
1119
1139
responder := td .GraphSyncHost2 ()
1140
+ assertComplete := assertCompletionFunction (responder , 2 )
1120
1141
1121
1142
// alternate storing location for responder
1122
1143
altStore1 := make (map [ipld.Link ][]byte )
@@ -1170,6 +1191,8 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) {
1170
1191
1171
1192
drain (requestor )
1172
1193
drain (responder )
1194
+ assertComplete (ctx , t )
1195
+ assertComplete (ctx , t )
1173
1196
1174
1197
tracing := collectTracing (t )
1175
1198
// two complete request traces expected
@@ -1217,14 +1240,15 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
1217
1240
1218
1241
// initialize graphsync on second node to response to requests
1219
1242
responder := td .GraphSyncHost2 ()
1220
-
1243
+ assertComplete := assertCompletionFunction ( responder , 1 )
1221
1244
progressChan , errChan := requestor .Request (ctx , td .host2 .ID (), blockChain .TipLink , blockChain .Selector ())
1222
1245
1223
1246
blockChain .VerifyWholeChain (ctx , progressChan )
1224
1247
testutil .VerifyEmptyErrors (ctx , t , errChan )
1225
1248
1226
1249
drain (requestor )
1227
1250
drain (responder )
1251
+ assertComplete (ctx , t )
1228
1252
1229
1253
tracing := collectTracing (t )
1230
1254
require .ElementsMatch (t , []string {
@@ -1484,6 +1508,38 @@ func drain(gs graphsync.GraphExchange) {
1484
1508
gs .(* GraphSync ).responseManager .Synchronize ()
1485
1509
}
1486
1510
1511
+ func assertCompletionFunction (gs graphsync.GraphExchange , completedRequestCount int ) func (context.Context , * testing.T ) {
1512
+ completedResponse := make (chan struct {}, completedRequestCount )
1513
+ gs .RegisterCompletedResponseListener (func (p peer.ID , request graphsync.RequestData , status graphsync.ResponseStatusCode ) {
1514
+ completedResponse <- struct {}{}
1515
+ })
1516
+ return func (ctx context.Context , t * testing.T ) {
1517
+ testutil .AssertDoesReceive (ctx , t , completedResponse , "request never completed" )
1518
+ }
1519
+ }
1520
+
1521
+ func assertCancelOrCompleteFunction (gs graphsync.GraphExchange , requestCount int ) func (context.Context , * testing.T ) bool {
1522
+ completedResponse := make (chan struct {}, requestCount )
1523
+ gs .RegisterCompletedResponseListener (func (p peer.ID , request graphsync.RequestData , status graphsync.ResponseStatusCode ) {
1524
+ completedResponse <- struct {}{}
1525
+ })
1526
+ cancelledResponse := make (chan struct {}, requestCount )
1527
+ gs .RegisterRequestorCancelledListener (func (p peer.ID , request graphsync.RequestData ) {
1528
+ cancelledResponse <- struct {}{}
1529
+ })
1530
+ return func (ctx context.Context , t * testing.T ) bool {
1531
+ select {
1532
+ case <- ctx .Done ():
1533
+ require .FailNow (t , "request did not cancel or complete" )
1534
+ return false
1535
+ case <- completedResponse :
1536
+ return false
1537
+ case <- cancelledResponse :
1538
+ return true
1539
+ }
1540
+ }
1541
+ }
1542
+
1487
1543
func newGsTestData (ctx context.Context , t * testing.T ) * gsTestData {
1488
1544
t .Helper ()
1489
1545
td := & gsTestData {ctx : ctx }
0 commit comments