@@ -7,6 +7,7 @@ use std::{
7
7
8
8
use anyhow:: Context ;
9
9
use common:: {
10
+ self ,
10
11
backoff:: Backoff ,
11
12
document:: {
12
13
ParseDocument ,
@@ -38,6 +39,12 @@ use value::{
38
39
39
40
use crate :: {
40
41
canonical_urls:: CANONICAL_URLS_TABLE ,
42
+ cron_jobs:: {
43
+ types:: CronNextRun ,
44
+ CronModel ,
45
+ CRON_JOBS_TABLE ,
46
+ CRON_NEXT_RUN_TABLE ,
47
+ } ,
41
48
database_globals:: {
42
49
types:: DatabaseVersion ,
43
50
DatabaseGlobalsModel ,
@@ -81,7 +88,7 @@ impl fmt::Display for MigrationCompletionCriterion {
81
88
// migrations unless explicitly dropping support.
82
89
// Add a user name next to the version when you make a change to highlight merge
83
90
// conflicts.
84
- pub const DATABASE_VERSION : DatabaseVersion = 118 ; // nipunn
91
+ pub const DATABASE_VERSION : DatabaseVersion = 119 ; // nipunn
85
92
86
93
pub struct MigrationWorker < RT : Runtime > {
87
94
rt : RT ,
@@ -394,6 +401,51 @@ impl<RT: Runtime> MigrationWorker<RT> {
394
401
} ,
395
402
// Empty migration for 118 - represents creation of CronNextRun table
396
403
118 => MigrationCompletionCriterion :: MigrationComplete ( to_version) ,
404
+ 119 => {
405
+ let mut tx = self . db . begin_system ( ) . await ?;
406
+ let namespaces: Vec < _ > = tx
407
+ . table_mapping ( )
408
+ . iter ( )
409
+ . filter_map ( |( _, namespace, _, table_name) | {
410
+ if table_name == & * CRON_JOBS_TABLE {
411
+ Some ( namespace)
412
+ } else {
413
+ None
414
+ }
415
+ } )
416
+ . collect ( ) ;
417
+
418
+ for namespace in namespaces {
419
+ let crons = CronModel :: new ( & mut tx, namespace. into ( ) ) . list ( ) . await ?;
420
+ for cron in crons. values ( ) {
421
+ let next_run = CronNextRun {
422
+ cron_job_id : cron. id ( ) . developer_id ,
423
+ state : cron. state ,
424
+ prev_ts : cron. prev_ts ,
425
+ next_ts : cron. next_ts ,
426
+ } ;
427
+ if let Some ( existing_next_run) = CronModel :: new ( & mut tx, namespace. into ( ) )
428
+ . next_run ( cron. id ( ) . developer_id )
429
+ . await ?
430
+ . map ( |next_run| next_run. into_value ( ) )
431
+ {
432
+ if existing_next_run != next_run {
433
+ SystemMetadataModel :: new ( & mut tx, namespace)
434
+ . replace ( cron. id ( ) , next_run. try_into ( ) ?)
435
+ . await ?;
436
+ }
437
+ } else {
438
+ SystemMetadataModel :: new ( & mut tx, namespace)
439
+ . insert ( & CRON_NEXT_RUN_TABLE , next_run. try_into ( ) ?)
440
+ . await ?;
441
+ }
442
+ }
443
+ }
444
+ self . db
445
+ . commit_with_write_source ( tx, "migration_119" )
446
+ . await ?;
447
+ MigrationCompletionCriterion :: MigrationComplete ( to_version)
448
+ } ,
397
449
// NOTE: Make sure to increase DATABASE_VERSION when adding new migrations.
398
450
_ => anyhow:: bail!( "Version did not define a migration! {}" , to_version) ,
399
451
} ;
0 commit comments