@@ -116,10 +116,9 @@ type LeafCallback func(keys [][]byte, path []byte, leaf []byte, parent common.Ha
116
116
117
117
// nodeRequest represents a scheduled or already in-flight trie node retrieval request.
118
118
type nodeRequest struct {
119
- hash common.Hash // Hash of the trie node to retrieve
120
- path []byte // Merkle path leading to this node for prioritization
121
- data []byte // Data content of the node, cached until all subtrees complete
122
- deletes [][]byte // List of internal path segments for trie nodes to delete
119
+ hash common.Hash // Hash of the trie node to retrieve
120
+ path []byte // Merkle path leading to this node for prioritization
121
+ data []byte // Data content of the node, cached until all subtrees complete
123
122
124
123
parent * nodeRequest // Parent state node referencing this entry
125
124
deps int // Number of dependencies before allowed to commit this node
@@ -146,38 +145,85 @@ type CodeSyncResult struct {
146
145
Data []byte // Data content of the retrieved bytecode
147
146
}
148
147
148
+ // nodeOp represents an operation upon the trie node. It can either represent a
149
+ // deletion to the specific node or a node write for persisting retrieved node.
150
+ type nodeOp struct {
151
+ owner common.Hash // identifier of the trie (empty for account trie)
152
+ path []byte // path from the root to the specified node.
153
+ blob []byte // the content of the node (nil for deletion)
154
+ hash common.Hash // hash of the node content (empty for node deletion)
155
+ }
156
+
157
+ // isDelete indicates if the operation is a database deletion.
158
+ func (op * nodeOp ) isDelete () bool {
159
+ return len (op .blob ) == 0
160
+ }
161
+
149
162
// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
150
163
// persisted data items.
151
164
type syncMemBatch struct {
152
- nodes map [string ][]byte // In-memory membatch of recently completed nodes
153
- hashes map [string ]common.Hash // Hashes of recently completed nodes
154
- deletes map [string ]struct {} // List of paths for trie node to delete
155
- codes map [common.Hash ][]byte // In-memory membatch of recently completed codes
156
- size uint64 // Estimated batch-size of in-memory data.
165
+ scheme string // State scheme identifier
166
+ codes map [common.Hash ][]byte // In-memory batch of recently completed codes
167
+ nodes []nodeOp // In-memory batch of recently completed/deleted nodes
168
+ size uint64 // Estimated batch-size of in-memory data.
157
169
}
158
170
159
171
// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
160
- func newSyncMemBatch () * syncMemBatch {
172
+ func newSyncMemBatch (scheme string ) * syncMemBatch {
161
173
return & syncMemBatch {
162
- nodes : make (map [string ][]byte ),
163
- hashes : make (map [string ]common.Hash ),
164
- deletes : make (map [string ]struct {}),
165
- codes : make (map [common.Hash ][]byte ),
174
+ scheme : scheme ,
175
+ codes : make (map [common.Hash ][]byte ),
166
176
}
167
177
}
168
178
169
- // hasNode reports the trie node with specific path is already cached.
170
- func (batch * syncMemBatch ) hasNode (path []byte ) bool {
171
- _ , ok := batch .nodes [string (path )]
172
- return ok
173
- }
174
-
175
179
// hasCode reports the contract code with specific hash is already cached.
176
180
func (batch * syncMemBatch ) hasCode (hash common.Hash ) bool {
177
181
_ , ok := batch .codes [hash ]
178
182
return ok
179
183
}
180
184
185
+ // addCode caches a contract code database write operation.
186
+ func (batch * syncMemBatch ) addCode (hash common.Hash , code []byte ) {
187
+ batch .codes [hash ] = code
188
+ batch .size += common .HashLength + uint64 (len (code ))
189
+ }
190
+
191
+ // addNode caches a node database write operation.
192
+ func (batch * syncMemBatch ) addNode (owner common.Hash , path []byte , blob []byte , hash common.Hash ) {
193
+ if batch .scheme == rawdb .PathScheme {
194
+ if owner == (common.Hash {}) {
195
+ batch .size += uint64 (len (path ) + len (blob ))
196
+ } else {
197
+ batch .size += common .HashLength + uint64 (len (path )+ len (blob ))
198
+ }
199
+ } else {
200
+ batch .size += common .HashLength + uint64 (len (blob ))
201
+ }
202
+ batch .nodes = append (batch .nodes , nodeOp {
203
+ owner : owner ,
204
+ path : path ,
205
+ blob : blob ,
206
+ hash : hash ,
207
+ })
208
+ }
209
+
210
+ // delNode caches a node database delete operation.
211
+ func (batch * syncMemBatch ) delNode (owner common.Hash , path []byte ) {
212
+ if batch .scheme != rawdb .PathScheme {
213
+ log .Error ("Unexpected node deletion" , "owner" , owner , "path" , path , "scheme" , batch .scheme )
214
+ return // deletion is not supported in hash mode.
215
+ }
216
+ if owner == (common.Hash {}) {
217
+ batch .size += uint64 (len (path ))
218
+ } else {
219
+ batch .size += common .HashLength + uint64 (len (path ))
220
+ }
221
+ batch .nodes = append (batch .nodes , nodeOp {
222
+ owner : owner ,
223
+ path : path ,
224
+ })
225
+ }
226
+
181
227
// Sync is the main state trie synchronisation scheduler, which provides yet
182
228
// unknown trie hashes to retrieve, accepts node data associated with said hashes
183
229
// and reconstructs the trie step by step until all is done.
@@ -196,7 +242,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
196
242
ts := & Sync {
197
243
scheme : scheme ,
198
244
database : database ,
199
- membatch : newSyncMemBatch (),
245
+ membatch : newSyncMemBatch (scheme ),
200
246
nodeReqs : make (map [string ]* nodeRequest ),
201
247
codeReqs : make (map [common.Hash ]* codeRequest ),
202
248
queue : prque.New [int64 , any ](nil ), // Ugh, can contain both string and hash, whyyy
@@ -210,16 +256,17 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
210
256
// parent for completion tracking. The given path is a unique node path in
211
257
// hex format and contain all the parent path if it's layered trie node.
212
258
func (s * Sync ) AddSubTrie (root common.Hash , path []byte , parent common.Hash , parentPath []byte , callback LeafCallback ) {
213
- // Short circuit if the trie is empty or already known
214
259
if root == types .EmptyRootHash {
215
260
return
216
261
}
217
- if s .membatch .hasNode (path ) {
218
- return
219
- }
220
262
owner , inner := ResolvePath (path )
221
- if rawdb .HasTrieNode (s .database , owner , inner , root , s .scheme ) {
263
+ exist , inconsistent := s .hasNode (owner , inner , root )
264
+ if exist {
265
+ // The entire subtrie is already present in the database.
222
266
return
267
+ } else if inconsistent {
268
+ // There is a pre-existing node with the wrong hash in DB, remove it.
269
+ s .membatch .delNode (owner , inner )
223
270
}
224
271
// Assemble the new sub-trie sync request
225
272
req := & nodeRequest {
@@ -371,39 +418,42 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error {
371
418
}
372
419
373
420
// Commit flushes the data stored in the internal membatch out to persistent
374
- // storage, returning any occurred error.
421
+ // storage, returning any occurred error. The whole data set will be flushed
422
+ // in an atomic database batch.
375
423
func (s * Sync ) Commit (dbw ethdb.Batch ) error {
376
424
// Flush the pending node writes into database batch.
377
425
var (
378
426
account int
379
427
storage int
380
428
)
381
- for path , value := range s .membatch .nodes {
382
- owner , inner := ResolvePath ([]byte (path ))
383
- if owner == (common.Hash {}) {
384
- account += 1
429
+ for _ , op := range s .membatch .nodes {
430
+ if op .isDelete () {
431
+ // node deletion is only supported in path mode.
432
+ if op .owner == (common.Hash {}) {
433
+ rawdb .DeleteAccountTrieNode (dbw , op .path )
434
+ } else {
435
+ rawdb .DeleteStorageTrieNode (dbw , op .owner , op .path )
436
+ }
437
+ deletionGauge .Inc (1 )
385
438
} else {
386
- storage += 1
439
+ if op .owner == (common.Hash {}) {
440
+ account += 1
441
+ } else {
442
+ storage += 1
443
+ }
444
+ rawdb .WriteTrieNode (dbw , op .owner , op .path , op .hash , op .blob , s .scheme )
387
445
}
388
- rawdb .WriteTrieNode (dbw , owner , inner , s .membatch .hashes [path ], value , s .scheme )
389
446
}
390
447
accountNodeSyncedGauge .Inc (int64 (account ))
391
448
storageNodeSyncedGauge .Inc (int64 (storage ))
392
449
393
- // Flush the pending node deletes into the database batch.
394
- // Please note that each written and deleted node has a
395
- // unique path, ensuring no duplication occurs.
396
- for path := range s .membatch .deletes {
397
- owner , inner := ResolvePath ([]byte (path ))
398
- rawdb .DeleteTrieNode (dbw , owner , inner , common.Hash {} /* unused */ , s .scheme )
399
- }
400
450
// Flush the pending code writes into database batch.
401
451
for hash , value := range s .membatch .codes {
402
452
rawdb .WriteCode (dbw , hash , value )
403
453
}
404
454
codeSyncedGauge .Inc (int64 (len (s .membatch .codes )))
405
455
406
- s .membatch = newSyncMemBatch () // reset the batch
456
+ s .membatch = newSyncMemBatch (s . scheme ) // reset the batch
407
457
return nil
408
458
}
409
459
@@ -476,12 +526,15 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
476
526
// child as invalid. This is essential in the case of path mode
477
527
// scheme; otherwise, state healing might overwrite existing child
478
528
// nodes silently while leaving a dangling parent node within the
479
- // range of this internal path on disk. This would break the
480
- // guarantee for state healing.
529
+ // range of this internal path on disk and the persistent state
530
+ // ends up with a very weird situation that nodes on the same path
531
+ // are not inconsistent while they all present in disk. This property
532
+ // would break the guarantee for state healing.
481
533
//
482
534
// While it's possible for this shortNode to overwrite a previously
483
535
// existing full node, the other branches of the fullNode can be
484
- // retained as they remain untouched and complete.
536
+ // retained as they are not accessible with the new shortNode, and
537
+ // also the whole sub-trie is still untouched and complete.
485
538
//
486
539
// This step is only necessary for path mode, as there is no deletion
487
540
// in hash mode at all.
@@ -498,8 +551,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
498
551
exists = rawdb .ExistsStorageTrieNode (s .database , owner , append (inner , key [:i ]... ))
499
552
}
500
553
if exists {
501
- req .deletes = append (req .deletes , key [:i ])
502
- deletionGauge .Inc (1 )
554
+ s .membatch .delNode (owner , append (inner , key [:i ]... ))
503
555
log .Debug ("Detected dangling node" , "owner" , owner , "path" , append (inner , key [:i ]... ))
504
556
}
505
557
}
@@ -521,6 +573,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
521
573
var (
522
574
missing = make (chan * nodeRequest , len (children ))
523
575
pending sync.WaitGroup
576
+ batchMu sync.Mutex
524
577
)
525
578
for _ , child := range children {
526
579
// Notify any external watcher of a new key/value node
@@ -538,34 +591,32 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
538
591
}
539
592
}
540
593
}
541
- // If the child references another node, resolve or schedule
594
+ // If the child references another node, resolve or schedule.
595
+ // We check all children concurrently.
542
596
if node , ok := (child .node ).(hashNode ); ok {
543
- // Try to resolve the node from the local database
544
- if s .membatch .hasNode (child .path ) {
545
- continue
546
- }
547
- // Check the presence of children concurrently
597
+ path := child .path
598
+ hash := common .BytesToHash (node )
548
599
pending .Add (1 )
549
- go func (child childNode ) {
600
+ go func () {
550
601
defer pending .Done ()
551
-
552
- // If database says duplicate, then at least the trie node is present
553
- // and we hold the assumption that it's NOT legacy contract code.
554
- var (
555
- chash = common .BytesToHash (node )
556
- owner , inner = ResolvePath (child .path )
557
- )
558
- if rawdb .HasTrieNode (s .database , owner , inner , chash , s .scheme ) {
602
+ owner , inner := ResolvePath (path )
603
+ exist , inconsistent := s .hasNode (owner , inner , hash )
604
+ if exist {
559
605
return
606
+ } else if inconsistent {
607
+ // There is a pre-existing node with the wrong hash in DB, remove it.
608
+ batchMu .Lock ()
609
+ s .membatch .delNode (owner , inner )
610
+ batchMu .Unlock ()
560
611
}
561
612
// Locally unknown node, schedule for retrieval
562
613
missing <- & nodeRequest {
563
- path : child . path ,
564
- hash : chash ,
614
+ path : path ,
615
+ hash : hash ,
565
616
parent : req ,
566
617
callback : req .callback ,
567
618
}
568
- }(child )
619
+ }()
569
620
}
570
621
}
571
622
pending .Wait ()
@@ -587,21 +638,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
587
638
// committed themselves.
588
639
func (s * Sync ) commitNodeRequest (req * nodeRequest ) error {
589
640
// Write the node content to the membatch
590
- s . membatch . nodes [ string ( req . path )] = req .data
591
- s .membatch .hashes [ string ( req .path )] = req .hash
641
+ owner , path := ResolvePath ( req .path )
642
+ s .membatch .addNode ( owner , path , req .data , req .hash )
592
643
593
- // The size tracking refers to the db-batch, not the in-memory data.
594
- if s .scheme == rawdb .PathScheme {
595
- s .membatch .size += uint64 (len (req .path ) + len (req .data ))
596
- } else {
597
- s .membatch .size += common .HashLength + uint64 (len (req .data ))
598
- }
599
- // Delete the internal nodes which are marked as invalid
600
- for _ , segment := range req .deletes {
601
- path := append (req .path , segment ... )
602
- s .membatch .deletes [string (path )] = struct {}{}
603
- s .membatch .size += uint64 (len (path ))
604
- }
644
+ // Removed the completed node request
605
645
delete (s .nodeReqs , string (req .path ))
606
646
s .fetches [len (req .path )]--
607
647
@@ -622,8 +662,9 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error {
622
662
// committed themselves.
623
663
func (s * Sync ) commitCodeRequest (req * codeRequest ) error {
624
664
// Write the node content to the membatch
625
- s .membatch .codes [req .hash ] = req .data
626
- s .membatch .size += common .HashLength + uint64 (len (req .data ))
665
+ s .membatch .addCode (req .hash , req .data )
666
+
667
+ // Removed the completed code request
627
668
delete (s .codeReqs , req .hash )
628
669
s .fetches [len (req .path )]--
629
670
@@ -639,6 +680,28 @@ func (s *Sync) commitCodeRequest(req *codeRequest) error {
639
680
return nil
640
681
}
641
682
683
+ // hasNode reports whether the specified trie node is present in the database.
684
+ // 'exists' is true when the node exists in the database and matches the given root
685
+ // hash. The 'inconsistent' return value is true when the node exists but does not
686
+ // match the expected hash.
687
+ func (s * Sync ) hasNode (owner common.Hash , path []byte , hash common.Hash ) (exists bool , inconsistent bool ) {
688
+ // If node is running with hash scheme, check the presence with node hash.
689
+ if s .scheme == rawdb .HashScheme {
690
+ return rawdb .HasLegacyTrieNode (s .database , hash ), false
691
+ }
692
+ // If node is running with path scheme, check the presence with node path.
693
+ var blob []byte
694
+ var dbHash common.Hash
695
+ if owner == (common.Hash {}) {
696
+ blob , dbHash = rawdb .ReadAccountTrieNode (s .database , path )
697
+ } else {
698
+ blob , dbHash = rawdb .ReadStorageTrieNode (s .database , owner , path )
699
+ }
700
+ exists = hash == dbHash
701
+ inconsistent = ! exists && len (blob ) != 0
702
+ return exists , inconsistent
703
+ }
704
+
642
705
// ResolvePath resolves the provided composite node path by separating the
643
706
// path in account trie if it's existent.
644
707
func ResolvePath (path []byte ) (common.Hash , []byte ) {
0 commit comments