Skip to content

Commit f6a08b5

Browse files
hannahhowardrvagg
andauthored
Refactor async loading for simplicity and correctness (#356)
* feat(reconciledloader): first working version of reconciled loader * feat(traversalrecorder): add better recorder for traversals * feat(reconciledloader): pipe reconciled loader through code style(lint): fix static checks * Update requestmanager/reconciledloader/injest.go Co-authored-by: Rod Vagg <[email protected]> * feat(reconciledloader): respond to PR comments Co-authored-by: Rod Vagg <[email protected]>
1 parent db297c1 commit f6a08b5

36 files changed

+1945
-2010
lines changed

docs/async-loading.png

-117 KB
Binary file not shown.

docs/async-loading.puml

-75
This file was deleted.

docs/processes.png

-2.35 KB
Loading

docs/processes.puml

+11-10
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,17 @@ partition "Top Level Interface" {
1212
if (operation type) then (outgoing request or incoming response)
1313
partition "Graphsync Requestor Implementation" {
1414
:RequestManager;
15-
if (operation type) then (incoming response)
16-
partition "Verifying Queries" {
15+
partition "Executing Requests" {
16+
:TaskQueue;
1717
fork
18-
:ipld.Traverse;
18+
:Executor;
1919
fork again
20-
:ipld.Traverse;
20+
:Executor;
2121
fork again
22-
:ipld.Traverse;
22+
:Executor;
2323
end fork
2424
}
25+
if (operation type) then (verified responses)
2526
partition "Collecting Responses" {
2627
fork
2728
:Response Collector;
@@ -33,21 +34,21 @@ end fork
3334
}
3435
:Responses returned to client;
3536
stop
36-
else (outgoing request)
37+
else (request messages)
3738
:Send Request To Network;
3839
endif
3940
}
4041
else (incoming request)
4142
partition "Graphsync Responder Implementation" {
4243
:ResponseManager;
4344
partition "Performing Queries" {
44-
:PeerTaskQueue;
45+
:TaskQueue;
4546
fork
46-
:ipld.Traverse;
47+
:QueryExecutor;
4748
fork again
48-
:ipld.Traverse;
49+
:QueryExecutor;
4950
fork again
50-
:ipld.Traverse;
51+
:QueryExecutor;
5152
end fork
5253
}
5354
}

docs/request-execution.png

167 KB
Loading

docs/request-execution.puml

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
@startuml Request Execution
2+
participant "GraphSync\nTop Level\nInterface" as TLI
3+
participant RequestManager
4+
participant TaskQueue
5+
participant RequestExecutor as RE
6+
participant ReconciledLoader
7+
participant TraversalRecord
8+
participant Verifier
9+
participant LocalStorage
10+
participant Traverser
11+
participant Network
12+
13+
== Initialization ==
14+
15+
TLI -> RequestManager ** : Setup
16+
TLI -> RE ** : Setup
17+
TLI -> TaskQueue ** : Setup
18+
19+
== Executing A Request ==
20+
21+
par
22+
note over TLI : Request Initiation
23+
TLI -> RequestManager : New Request
24+
RequestManager -> RequestManager : Create Request Context
25+
RequestManager -> TaskQueue : Push Request
26+
else
27+
note over RE: Request Execution
28+
TaskQueue -> RE : Next Request\nTo Process
29+
RE -> RequestManager : Initiate request execution
30+
RequestManager -> Traverser ** : Create to manage selector traversal
31+
RequestManager -> ReconciledLoader ** : create to manage
32+
RequestManager -> RE : Traverser + ReconciledLoader
33+
note over RE: Local loading phase
34+
loop until traversal complete, request context cancelled, or missing block locally
35+
Traverser -> RE : Request to load blocks\nto perform traversal
36+
RE -> ReconciledLoader : Load next block
37+
ReconciledLoader -> LocalStorage : Load Block
38+
LocalStorage --> ReconciledLoader : Block or missing
39+
ReconciledLoader -> TraversalRecord : Record link traversal
40+
TraversalRecord --> ReconciledLoader
41+
ReconciledLoader --> RE : Block or missing
42+
opt block is present
43+
RE --> Traverser : Next block to load
44+
end
45+
end
46+
RE -> Network : Send Graphsync Request
47+
RE -> ReconciledLoader : remote online
48+
ReconciledLoader -> Verifier ** : Create new from traversal record
49+
ReconciledLoader -> RE
50+
note over RE: Remote loading phase
51+
loop until traversal complete, request context cancelled, or missing block locally
52+
Traverser -> RE : Request to load blocks\nto perform traversal
53+
RE -> ReconciledLoader : Load next block
54+
alt on missing path for remote
55+
ReconciledLoader -> LocalStorage : Load Block
56+
LocalStorage --> ReconciledLoader : Block or missing
57+
else
58+
loop until block loaded, missing, or error
59+
opt new remote responses
60+
alt verification not done
61+
ReconciledLoader -> Verifier : verify next response
62+
alt success
63+
Verifier --> ReconciledLoader : verified
64+
ReconciledLoader -> ReconciledLoader : wait for more responses
65+
else failure
66+
Verifier --> ReconciledLoader : error
67+
end
68+
else verification done
69+
alt next response matches current block load
70+
71+
alt next response contains a block
72+
ReconciledLoader -> LocalStorage : store remote block
73+
LocalStorage --> ReconciledLoader
74+
ReconciledLoader -> ReconciledLoader : block laoded from remote
75+
else next response does not contain block
76+
opt next response is missing
77+
ReconciledLoader -> ReconciledLoader : record missing path
78+
end
79+
ReconciledLoader -> LocalStorage : load block
80+
LocalStorage --> ReconciledLoader : block or missing
81+
end
82+
else next response doesn not match
83+
ReconciledLoader -> ReconciledLoader : error
84+
end
85+
end
86+
end
87+
opt remote goes offline
88+
ReconciledLoader -> LocalStorage : load block
89+
LocalStorage --> ReconciledLoader : block or missing
90+
end
91+
end
92+
ReconciledLoader -> TraversalRecord : Record link traversal
93+
TraversalRecord --> ReconciledLoader
94+
ReconciledLoader --> RE : Block, missing or error
95+
RE -> Traverser : Next block to load
96+
end
97+
end
98+
else
99+
Network -> RequestManager : New Responses
100+
RequestManager -> ReconciledLoader : Ingest Responses
101+
end
102+
@enduml

docs/responder-sequence.puml

+13-23
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,20 @@
11
@startuml Responding To A Request
22
participant "GraphSync\nTop Level\nInterface" as TLI
33
participant ResponseManager
4-
participant "Query Executor" as QW
5-
participant PeerTaskQueue
4+
participant "QueryExecutor" as QW
5+
participant TaskQueue
66
participant PeerTracker
77
participant Traverser
88
participant ResponseAssembler
99
participant LinkTracker
10-
participant ResponseBuilder
11-
participant "Intercepted Loader" as ILoader
1210
participant Loader
1311
participant "Message Sending\nLayer" as Message
1412

1513
== Initialization ==
1614

1715
TLI -> ResponseManager ** : Setup
18-
ResponseManager -> QW ** : Create
19-
activate QW
20-
TLI -> PeerTaskQueue ** : Setup
21-
TLI -> PeerResponseManager ** : Setup
16+
TLI -> QW ** : Setup
17+
TLI -> TaskQueue ** : Setup
2218

2319
== Responding To Request ==
2420

@@ -27,38 +23,32 @@ loop until shutdown
2723
note over TLI : Request Queueing Loop
2824
TLI -> ResponseManager : Process requests
2925
alt new request
30-
ResponseManager -> PeerTaskQueue : Push Request
31-
PeerTaskQueue -> PeerTracker ** : Create for peer\n as neccesary
32-
PeerTaskQueue -> PeerTracker : Push Request
3326
ResponseManager -> ResponseManager : Create Request Context
27+
ResponseManager -> TaskQueue : Push Request
3428
else cancel request
3529
ResponseManager -> ResponseManager : Cancel Request Context
3630
end
3731
end
3832
else
3933
loop until shutdown
4034
note over QW: Request Processing Loop
41-
QW -> PeerTaskQueue : Pop Request
42-
PeerTaskQueue -> PeerTracker : Pop Request
43-
PeerTracker -> PeerTaskQueue : Next Request\nTo Process
44-
PeerTaskQueue -> QW : Next Request\nTo Process
35+
TaskQueue -> QW : Next Request\nTo Process
36+
activate QW
4537
QW -> QW : Process incoming request hooks
46-
QW -> ILoader ** : Create w/ Request, Peer, and Loader
4738
QW -> Traverser ** : Create to manage selector traversal
4839
loop until traversal complete or request context cancelled
4940
note over Traverser: Selector Traversal Loop
50-
Traverser -> ILoader : Request to load blocks\nto perform traversal
51-
ILoader -> Loader : Load blocks\nfrom local storage
52-
Loader -> ILoader : Blocks From\nlocal storage or error
53-
ILoader -> Traverser : Blocks to continue\n traversal or error
54-
ILoader -> QW : Block or error to Send Back
41+
Traverser -> QW : Request to load blocks\nto perform traversal
42+
QW -> Loader : Load blocks\nfrom local storage
43+
Loader -> QW : Blocks From\nlocal storage or error
44+
QW -> Traverser : Blocks to continue\n traversal or error
5545
QW -> QW: Processing outgoing block hooks
5646
QW -> ResponseAssembler: Add outgoing responses
5747
activate ResponseAssembler
5848
ResponseAssembler -> LinkTracker ** : Create for peer if not already present
5949
ResponseAssembler -> LinkTracker : Notify block or\n error, ask whether\n block is duplicate
6050
LinkTracker -> ResponseAssembler : Whether to\n send block
61-
ResponseAssembler -> ResponseBuilder : Aggregate Response Metadata & Block
51+
ResponseAssembler -> ResponseAssembler : Aggregate Response Metadata & Blocks
6252
ResponseAssembler -> Message : Send aggregate response
6353
deactivate ResponseAssembler
6454
end
@@ -67,7 +57,7 @@ QW -> ResponseAssembler : Request Finished
6757
activate ResponseAssembler
6858
ResponseAssembler -> LinkTracker : Query If Errors\n Were Present
6959
LinkTracker -> ResponseAssembler : True/False\n if errors present
70-
ResponseAssembler -> ResponseBuilder : Aggregate request finishing
60+
ResponseAssembler -> ResponseAssembler : Aggregate request finishing
7161
ResponseAssembler -> Message : Send aggregate response
7262
deactivate ResponseAssembler
7363
end

graphsync.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,29 @@ func (e RequestNotFoundErr) Error() string {
127127
}
128128

129129
// RemoteMissingBlockErr indicates that the remote peer was missing a block
130-
// in the selector requested. It is a non-terminal error in the error stream
130+
// in the selector requested, and we also don't have it locally.
131+
// It is a -terminal error in the error stream
131132
// for a request and does NOT cause a request to fail completely
132133
type RemoteMissingBlockErr struct {
133134
Link ipld.Link
135+
Path ipld.Path
134136
}
135137

136138
func (e RemoteMissingBlockErr) Error() string {
137-
return fmt.Sprintf("remote peer is missing block: %s", e.Link.String())
139+
return fmt.Sprintf("remote peer is missing block (%s) at path %s", e.Link.String(), e.Path)
140+
}
141+
142+
// RemoteIncorrectResponseError indicates that the remote peer sent a response
143+
// to a traversal that did not correspond with the expected next link
144+
// in the selector traversal based on verification of data up to this point
145+
type RemoteIncorrectResponseError struct {
146+
LocalLink ipld.Link
147+
RemoteLink ipld.Link
148+
Path ipld.Path
149+
}
150+
151+
func (e RemoteIncorrectResponseError) Error() string {
152+
return fmt.Sprintf("expected link (%s) at path %s does not match link sent by remote (%s), possible malicious responder", e.LocalLink, e.Path, e.RemoteLink)
138153
}
139154

140155
var (
@@ -223,6 +238,8 @@ type LinkMetadataIterator func(cid.Cid, LinkAction)
223238

224239
// LinkMetadata is used to access link metadata through an Iterator
225240
type LinkMetadata interface {
241+
// Length returns the number of metadata entries
242+
Length() int64
226243
// Iterate steps over individual metadata one by one using the provided
227244
// callback
228245
Iterate(LinkMetadataIterator)

0 commit comments

Comments
 (0)