7
7
container ,
8
8
ErrorCode ,
9
9
errors ,
10
- logger ,
10
+ Logger ,
11
+ logger as defaultLogger ,
11
12
ReplicationAssertionError ,
12
13
ServiceError
13
14
} from '@powersync/lib-services-framework' ;
@@ -55,12 +56,16 @@ export interface MongoBucketBatchOptions {
55
56
skipExistingRows : boolean ;
56
57
57
58
markRecordUnavailable : BucketStorageMarkRecordUnavailable | undefined ;
59
+
60
+ logger ?: Logger ;
58
61
}
59
62
60
63
export class MongoBucketBatch
61
64
extends BaseObserver < storage . BucketBatchStorageListener >
62
65
implements storage . BucketStorageBatch
63
66
{
67
+ private logger : Logger ;
68
+
64
69
private readonly client : mongo . MongoClient ;
65
70
public readonly db : PowerSyncMongo ;
66
71
public readonly session : mongo . ClientSession ;
@@ -96,6 +101,7 @@ export class MongoBucketBatch
96
101
97
102
constructor ( options : MongoBucketBatchOptions ) {
98
103
super ( ) ;
104
+ this . logger = options . logger ?? defaultLogger ;
99
105
this . client = options . db . client ;
100
106
this . db = options . db ;
101
107
this . group_id = options . groupId ;
@@ -242,7 +248,9 @@ export class MongoBucketBatch
242
248
current_data_lookup . set ( cacheKey ( doc . _id . t , doc . _id . k ) , doc ) ;
243
249
}
244
250
245
- let persistedBatch : PersistedBatch | null = new PersistedBatch ( this . group_id , transactionSize ) ;
251
+ let persistedBatch : PersistedBatch | null = new PersistedBatch ( this . group_id , transactionSize , {
252
+ logger : this . logger
253
+ } ) ;
246
254
247
255
for ( let op of b ) {
248
256
if ( resumeBatch ) {
@@ -329,7 +337,7 @@ export class MongoBucketBatch
329
337
this . markRecordUnavailable ( record ) ;
330
338
} else {
331
339
// Log to help with debugging if there was a consistency issue
332
- logger . warn (
340
+ this . logger . warn (
333
341
`Cannot find previous record for update on ${ record . sourceTable . qualifiedName } : ${ beforeId } / ${ record . before ?. id } `
334
342
) ;
335
343
}
@@ -350,7 +358,7 @@ export class MongoBucketBatch
350
358
existing_lookups = [ ] ;
351
359
// Log to help with debugging if there was a consistency issue
352
360
if ( this . storeCurrentData ) {
353
- logger . warn (
361
+ this . logger . warn (
354
362
`Cannot find previous record for delete on ${ record . sourceTable . qualifiedName } : ${ beforeId } / ${ record . before ?. id } `
355
363
) ;
356
364
}
@@ -447,7 +455,7 @@ export class MongoBucketBatch
447
455
}
448
456
}
449
457
) ;
450
- logger . error (
458
+ this . logger . error (
451
459
`Failed to evaluate data query on ${ record . sourceTable . qualifiedName } .${ record . after ?. id } : ${ error . error } `
452
460
) ;
453
461
}
@@ -487,7 +495,7 @@ export class MongoBucketBatch
487
495
}
488
496
}
489
497
) ;
490
- logger . error (
498
+ this . logger . error (
491
499
`Failed to evaluate parameter query on ${ record . sourceTable . qualifiedName } .${ after . id } : ${ error . error } `
492
500
) ;
493
501
}
@@ -541,7 +549,7 @@ export class MongoBucketBatch
541
549
if ( e instanceof mongo . MongoError && e . hasErrorLabel ( 'TransientTransactionError' ) ) {
542
550
// Likely write conflict caused by concurrent write stream replicating
543
551
} else {
544
- logger . warn ( 'Transaction error' , e as Error ) ;
552
+ this . logger . warn ( 'Transaction error' , e as Error ) ;
545
553
}
546
554
await timers . setTimeout ( Math . random ( ) * 50 ) ;
547
555
throw e ;
@@ -566,7 +574,7 @@ export class MongoBucketBatch
566
574
await this . withTransaction ( async ( ) => {
567
575
flushTry += 1 ;
568
576
if ( flushTry % 10 == 0 ) {
569
- logger . info ( `${ this . slot_name } ${ description } - try ${ flushTry } ` ) ;
577
+ this . logger . info ( `${ description } - try ${ flushTry } ` ) ;
570
578
}
571
579
if ( flushTry > 20 && Date . now ( ) > lastTry ) {
572
580
throw new ServiceError ( ErrorCode . PSYNC_S1402 , 'Max transaction tries exceeded' ) ;
@@ -636,12 +644,12 @@ export class MongoBucketBatch
636
644
if ( this . last_checkpoint_lsn != null && lsn < this . last_checkpoint_lsn ) {
637
645
// When re-applying transactions, don't create a new checkpoint until
638
646
// we are past the last transaction.
639
- logger . info ( `Re-applied transaction ${ lsn } - skipping checkpoint` ) ;
647
+ this . logger . info ( `Re-applied transaction ${ lsn } - skipping checkpoint` ) ;
640
648
return false ;
641
649
}
642
650
if ( lsn < this . no_checkpoint_before_lsn ) {
643
651
if ( Date . now ( ) - this . lastWaitingLogThottled > 5_000 ) {
644
- logger . info (
652
+ this . logger . info (
645
653
`Waiting until ${ this . no_checkpoint_before_lsn } before creating checkpoint, currently at ${ lsn } . Persisted op: ${ this . persisted_op } `
646
654
) ;
647
655
this . lastWaitingLogThottled = Date . now ( ) ;
@@ -713,7 +721,7 @@ export class MongoBucketBatch
713
721
if ( this . persisted_op != null ) {
714
722
// The commit may have been skipped due to "no_checkpoint_before_lsn".
715
723
// Apply it now if relevant
716
- logger . info ( `Commit due to keepalive at ${ lsn } / ${ this . persisted_op } ` ) ;
724
+ this . logger . info ( `Commit due to keepalive at ${ lsn } / ${ this . persisted_op } ` ) ;
717
725
return await this . commit ( lsn ) ;
718
726
}
719
727
@@ -777,7 +785,7 @@ export class MongoBucketBatch
777
785
return null ;
778
786
}
779
787
780
- logger . debug ( `Saving ${ record . tag } :${ record . before ?. id } /${ record . after ?. id } ` ) ;
788
+ this . logger . debug ( `Saving ${ record . tag } :${ record . before ?. id } /${ record . after ?. id } ` ) ;
781
789
782
790
this . batch ??= new OperationBatch ( ) ;
783
791
this . batch . push ( new RecordOperation ( record ) ) ;
@@ -848,7 +856,7 @@ export class MongoBucketBatch
848
856
session : session
849
857
} ) ;
850
858
const batch = await cursor . toArray ( ) ;
851
- const persistedBatch = new PersistedBatch ( this . group_id , 0 ) ;
859
+ const persistedBatch = new PersistedBatch ( this . group_id , 0 , { logger : this . logger } ) ;
852
860
853
861
for ( let value of batch ) {
854
862
persistedBatch . saveBucketData ( {
0 commit comments