@@ -24,11 +24,9 @@ import (
24
24
"path/filepath"
25
25
"sync"
26
26
"sync/atomic"
27
- "time"
28
27
29
28
"github.com/gofrs/flock"
30
29
31
- "github.com/CortexFoundation/CortexTheseus/common"
32
30
"github.com/CortexFoundation/CortexTheseus/ctxcdb"
33
31
"github.com/CortexFoundation/CortexTheseus/log"
34
32
"github.com/CortexFoundation/CortexTheseus/metrics"
@@ -389,115 +387,3 @@ func (f *Freezer) repair() error {
389
387
f .tail .Store (tail )
390
388
return nil
391
389
}
392
-
393
- // convertLegacyFn takes a raw freezer entry in an older format and
394
- // returns it in the new format.
395
- type convertLegacyFn = func ([]byte ) ([]byte , error )
396
-
397
- // MigrateTable processes the entries in a given table in sequence
398
- // converting them to a new format if they're of an old format.
399
- func (f * Freezer ) MigrateTable (kind string , convert convertLegacyFn ) error {
400
- if f .readonly {
401
- return errReadOnly
402
- }
403
- f .writeLock .Lock ()
404
- defer f .writeLock .Unlock ()
405
-
406
- table , ok := f .tables [kind ]
407
- if ! ok {
408
- return errUnknownTable
409
- }
410
- // forEach iterates every entry in the table serially and in order, calling `fn`
411
- // with the item as argument. If `fn` returns an error the iteration stops
412
- // and that error will be returned.
413
- forEach := func (t * freezerTable , offset uint64 , fn func (uint64 , []byte ) error ) error {
414
- var (
415
- items = t .items .Load ()
416
- batchSize = uint64 (1024 )
417
- maxBytes = uint64 (1024 * 1024 )
418
- )
419
- for i := offset ; i < items ; {
420
- if i + batchSize > items {
421
- batchSize = items - i
422
- }
423
- data , err := t .RetrieveItems (i , batchSize , maxBytes )
424
- if err != nil {
425
- return err
426
- }
427
- for j , item := range data {
428
- if err := fn (i + uint64 (j ), item ); err != nil {
429
- return err
430
- }
431
- }
432
- i += uint64 (len (data ))
433
- }
434
- return nil
435
- }
436
- // TODO(s1na): This is a sanity-check since as of now no process does tail-deletion. But the migration
437
- // process assumes no deletion at tail and needs to be modified to account for that.
438
- if table .itemOffset .Load () > 0 || table .itemHidden .Load () > 0 {
439
- return errors .New ("migration not supported for tail-deleted freezers" )
440
- }
441
- ancientsPath := filepath .Dir (table .index .Name ())
442
- // Set up new dir for the migrated table, the content of which
443
- // we'll at the end move over to the ancients dir.
444
- migrationPath := filepath .Join (ancientsPath , "migration" )
445
- newTable , err := newFreezerTable (migrationPath , kind , table .noCompression , false )
446
- if err != nil {
447
- return err
448
- }
449
- var (
450
- batch = newTable .newBatch ()
451
- out []byte
452
- start = time .Now ()
453
- logged = time .Now ()
454
- offset = newTable .items .Load ()
455
- )
456
- if offset > 0 {
457
- log .Info ("found previous migration attempt" , "migrated" , offset )
458
- }
459
- // Iterate through entries and transform them
460
- if err := forEach (table , offset , func (i uint64 , blob []byte ) error {
461
- if i % 10000 == 0 && time .Since (logged ) > 16 * time .Second {
462
- log .Info ("Processing legacy elements" , "count" , i , "elapsed" , common .PrettyDuration (time .Since (start )))
463
- logged = time .Now ()
464
- }
465
- out , err = convert (blob )
466
- if err != nil {
467
- return err
468
- }
469
- if err := batch .AppendRaw (i , out ); err != nil {
470
- return err
471
- }
472
- return nil
473
- }); err != nil {
474
- return err
475
- }
476
- if err := batch .commit (); err != nil {
477
- return err
478
- }
479
- log .Info ("Replacing old table files with migrated ones" , "elapsed" , common .PrettyDuration (time .Since (start )))
480
- // Release and delete old table files. Note this won't
481
- // delete the index file.
482
- table .releaseFilesAfter (0 , true )
483
-
484
- if err := newTable .Close (); err != nil {
485
- return err
486
- }
487
- files , err := os .ReadDir (migrationPath )
488
- if err != nil {
489
- return err
490
- }
491
- // Move migrated files to ancients dir.
492
- for _ , f := range files {
493
- // This will replace the old index file as a side-effect.
494
- if err := os .Rename (filepath .Join (migrationPath , f .Name ()), filepath .Join (ancientsPath , f .Name ())); err != nil {
495
- return err
496
- }
497
- }
498
- // Delete by now empty dir.
499
- if err := os .Remove (migrationPath ); err != nil {
500
- return err
501
- }
502
- return nil
503
- }
0 commit comments