Skip to content

Commit 1c3d63a

Browse files
rvagghannahhoward
authored andcommitted
fix: improve test synchronization/drainage
1 parent 80fcd3a commit 1c3d63a

File tree

3 files changed

+45
-32
lines changed

3 files changed

+45
-32
lines changed

impl/graphsync_test.go

+39-26
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
242242
drain(requestor)
243243
drain(responder)
244244
wasCancelled := assertCancelOrComplete(ctx, t)
245+
245246
tracing := collectTracing(t)
246247
traceStrings := tracing.TracesToStrings()
247248
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
@@ -277,6 +278,7 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
277278

278279
// initialize graphsync on second node to response to requests
279280
responder := td.GraphSyncHost2(MaxLinksPerIncomingRequests(linksToTraverse))
281+
assertComplete := assertCompletionFunction(responder, 1)
280282

281283
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)
282284

@@ -287,6 +289,7 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
287289

288290
drain(requestor)
289291
drain(responder)
292+
assertComplete(ctx, t)
290293

291294
tracing := collectTracing(t)
292295
require.ElementsMatch(t, []string{
@@ -318,6 +321,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
318321

319322
// initialize graphsync on second node to response to requests
320323
responder := td.GraphSyncHost2()
324+
assertComplete := assertCompletionFunction(responder, 1)
321325

322326
var receivedResponseData []byte
323327
var receivedRequestData []byte
@@ -364,6 +368,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
364368

365369
drain(requestor)
366370
drain(responder)
371+
assertComplete(ctx, t)
367372

368373
tracing := collectTracing(t)
369374
require.ElementsMatch(t, []string{
@@ -400,6 +405,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
400405

401406
// initialize graphsync on second node to response to requests
402407
responder := td.GraphSyncHost2()
408+
assertComplete := assertCompletionFunction(responder, 1)
403409

404410
finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
405411
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
@@ -431,6 +437,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
431437

432438
drain(requestor)
433439
drain(responder)
440+
assertComplete(ctx, t)
434441

435442
tracing := collectTracing(t)
436443
require.ElementsMatch(t, []string{
@@ -472,6 +479,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
472479

473480
// initialize graphsync on second node to response to requests
474481
responder := td.GraphSyncHost2()
482+
assertComplete := assertCompletionFunction(responder, 1)
475483

476484
totalSent := 0
477485
totalSentOnWire := 0
@@ -493,6 +501,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
493501

494502
drain(requestor)
495503
drain(responder)
504+
assertComplete(ctx, t)
496505

497506
tracing := collectTracing(t)
498507
require.ElementsMatch(t, []string{
@@ -558,6 +567,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
558567
drain(requestor)
559568
drain(responder)
560569
assertComplete(ctx, t)
570+
561571
tracing := collectTracing(t)
562572
require.ElementsMatch(t, []string{
563573
"response(0)->executeTask(0)",
@@ -641,6 +651,7 @@ func TestPauseResume(t *testing.T) {
641651
drain(requestor)
642652
drain(responder)
643653
assertOneRequestCompletes(ctx, t)
654+
644655
tracing := collectTracing(t)
645656
traceStrings := tracing.TracesToStrings()
646657
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
@@ -713,6 +724,7 @@ func TestPauseResumeRequest(t *testing.T) {
713724
// should get max 1 cancel
714725
require.False(t, assertCancelOrComplete(ctx, t))
715726
}
727+
716728
tracing := collectTracing(t)
717729
traceStrings := tracing.TracesToStrings()
718730
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
@@ -1049,7 +1061,7 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
10491061

10501062
// initialize graphsync on second node to response to requests
10511063
responder := td.GraphSyncHost2()
1052-
assertComplete := assertCompletionFunction(responder, 2)
1064+
assertCancelOrComplete := assertCancelOrCompleteFunction(responder, 1)
10531065

10541066
// alternate storing location for responder
10551067
altStore1 := make(map[ipld.Link][]byte)
@@ -1104,21 +1116,21 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
11041116

11051117
drain(requestor)
11061118
drain(responder)
1107-
assertComplete(ctx, t)
1108-
assertComplete(ctx, t)
1119+
wasCancelled := assertCancelOrComplete(ctx, t)
11091120

11101121
tracing := collectTracing(t)
1111-
// two complete request traces expected
1112-
require.ElementsMatch(t, []string{
1113-
"response(0)->executeTask(0)",
1114-
"response(1)->executeTask(0)",
1115-
"request(0)->newRequest(0)",
1116-
"request(0)->executeTask(0)",
1117-
"request(0)->terminateRequest(0)",
1118-
"request(1)->newRequest(0)",
1119-
"request(1)->executeTask(0)",
1120-
"request(1)->terminateRequest(0)",
1121-
}, tracing.TracesToStrings())
1122+
traceStrings := tracing.TracesToStrings()
1123+
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
1124+
// may or may not contain a second response trace: "response(1)->executeTask(0)""
1125+
if wasCancelled {
1126+
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
1127+
}
1128+
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
1129+
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
1130+
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
1131+
require.Contains(t, traceStrings, "request(1)->newRequest(0)")
1132+
require.Contains(t, traceStrings, "request(1)->executeTask(0)")
1133+
require.Contains(t, traceStrings, "request(1)->terminateRequest(0)")
11221134
// TODO(rvagg): this is randomly either a SkipMe or a ipldutil.ContextCancelError; confirm this is sane
11231135
// tracing.SingleExceptionEvent(t, "request(0)->newRequest(0)","request(0)->executeTask(0)", "SkipMe", traversal.SkipMe{}.Error(), true)
11241136
}
@@ -1192,20 +1204,18 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) {
11921204
drain(requestor)
11931205
drain(responder)
11941206
assertComplete(ctx, t)
1195-
assertComplete(ctx, t)
11961207

11971208
tracing := collectTracing(t)
11981209
// two complete request traces expected
1199-
require.ElementsMatch(t, []string{
1200-
"response(0)->executeTask(0)",
1201-
"response(1)->executeTask(0)",
1202-
"request(0)->newRequest(0)",
1203-
"request(0)->executeTask(0)",
1204-
"request(0)->terminateRequest(0)",
1205-
"request(1)->newRequest(0)",
1206-
"request(1)->executeTask(0)",
1207-
"request(1)->terminateRequest(0)",
1208-
}, tracing.TracesToStrings())
1210+
traceStrings := tracing.TracesToStrings()
1211+
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
1212+
// may or may not contain a second response "response(1)->executeTask(0)"
1213+
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
1214+
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
1215+
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
1216+
require.Contains(t, traceStrings, "request(1)->newRequest(0)")
1217+
require.Contains(t, traceStrings, "request(1)->executeTask(0)")
1218+
require.Contains(t, traceStrings, "request(1)->terminateRequest(0)")
12091219
}
12101220

12111221
// TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work
@@ -1330,6 +1340,7 @@ func TestUnixFSFetch(t *testing.T) {
13301340
td := newGsTestData(ctx, t)
13311341
requestor := New(ctx, td.gsnet1, persistence1)
13321342
responder := New(ctx, td.gsnet2, persistence2)
1343+
assertComplete := assertCompletionFunction(responder, 1)
13331344
extensionName := graphsync.ExtensionName("Free for all")
13341345
responder.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
13351346
hookActions.ValidateRequest()
@@ -1381,6 +1392,7 @@ func TestUnixFSFetch(t *testing.T) {
13811392

13821393
drain(requestor)
13831394
drain(responder)
1395+
assertComplete(ctx, t)
13841396

13851397
tracing := collectTracing(t)
13861398
require.ElementsMatch(t, []string{
@@ -1409,6 +1421,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {
14091421

14101422
// initialize graphsync on second node to response to requests
14111423
responder := td.GraphSyncHost2()
1424+
assertComplete := assertCompletionFunction(responder, 1)
14121425

14131426
// register hooks to count blocks in various stages
14141427
blocksSent := 0
@@ -1474,6 +1487,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {
14741487

14751488
drain(requestor)
14761489
drain(responder)
1490+
assertComplete(ctx, t)
14771491

14781492
tracing := collectTracing(t)
14791493
require.ElementsMatch(t, []string{
@@ -1505,7 +1519,6 @@ type gsTestData struct {
15051519
func drain(gs graphsync.GraphExchange) {
15061520
gs.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()
15071521
gs.(*GraphSync).responseQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()
1508-
gs.(*GraphSync).responseManager.Synchronize()
15091522
}
15101523

15111524
func assertCompletionFunction(gs graphsync.GraphExchange, completedRequestCount int) func(context.Context, *testing.T) {

responsemanager/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.Request
200200
}
201201

202202
// Synchronize is a utility method that blocks until all current messages are processed
203-
func (rm *ResponseManager) Synchronize() {
203+
func (rm *ResponseManager) synchronize() {
204204
sync := make(chan error)
205205
rm.send(&synchronizeMessage{sync}, nil)
206206
select {

responsemanager/responsemanager_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func TestCancellationQueryInProgress(t *testing.T) {
100100
gsmsg.CancelRequest(td.requestID),
101101
}
102102
responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
103-
responseManager.Synchronize()
103+
responseManager.synchronize()
104104
close(waitForCancel)
105105

106106
testutil.AssertDoesReceive(td.ctx, t, cancelledListenerCalled, "should call cancelled listener")
@@ -149,7 +149,7 @@ func TestEarlyCancellation(t *testing.T) {
149149
td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
150150
responseManager.Startup()
151151
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
152-
responseManager.Synchronize()
152+
responseManager.synchronize()
153153
td.connManager.AssertProtectedWithTags(t, td.p, td.requests[0].ID().Tag())
154154

155155
// send a cancellation
@@ -158,7 +158,7 @@ func TestEarlyCancellation(t *testing.T) {
158158
}
159159
responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
160160

161-
responseManager.Synchronize()
161+
responseManager.synchronize()
162162

163163
td.assertNoResponses()
164164
td.connManager.RefuteProtected(t, td.p)
@@ -634,7 +634,7 @@ func TestValidationAndExtensions(t *testing.T) {
634634
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
635635
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
636636
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
637-
responseManager.Synchronize()
637+
responseManager.synchronize()
638638
close(wait)
639639
td.assertCompleteRequestWith(graphsync.RequestCompletedFull)
640640
td.assertReceiveExtensionResponse()
@@ -704,7 +704,7 @@ func TestValidationAndExtensions(t *testing.T) {
704704
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
705705
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
706706
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
707-
responseManager.Synchronize()
707+
responseManager.synchronize()
708708
close(wait)
709709
td.assertCompleteRequestWith(graphsync.RequestFailedUnknown)
710710
})

0 commit comments

Comments
 (0)