Skip to content

Commit 74616bb

Browse files
nipunn1313Convex, Inc.
authored and
Convex, Inc.
committed
Add CronNextRun struct and table (#36026)
Add CronNextRun struct and table to split out some data from CronJob struct. Double write to them. Goal is to decouple the cron job executor from the push process to avoid OCCs. To do this migration correctly requires a ton of pushes 1. Add table + Double write 2. Run a migration to copy into second table. 3. Read from new table (but keep writing to old table) 4. Stop writing to old table GitOrigin-RevId: 10d4a7769e10a7df5ba9d11b1e6d2da8731e58e4
1 parent f60b37f commit 74616bb

File tree

4 files changed

+168
-6
lines changed

4 files changed

+168
-6
lines changed

crates/model/src/cron_jobs/mod.rs

+105-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use sync_types::CanonicalizedModulePath;
3030
use value::{
3131
heap_size::WithHeapSize,
3232
ConvexValue,
33+
DeveloperDocumentId,
3334
FieldPath,
3435
ResolvedDocumentId,
3536
TableName,
@@ -46,6 +47,7 @@ use crate::{
4647
CronJobLogLines,
4748
CronJobState,
4849
CronJobStatus,
50+
CronNextRun,
4951
CronSpec,
5052
},
5153
},
@@ -85,6 +87,21 @@ pub static CRON_JOB_LOGS_NAME_FIELD: LazyLock<FieldPath> =
8587
static CRON_JOB_LOGS_TS_FIELD: LazyLock<FieldPath> =
8688
LazyLock::new(|| "ts".parse().expect("invalid ts field"));
8789

90+
pub static CRON_NEXT_RUN_TABLE: LazyLock<TableName> = LazyLock::new(|| {
91+
"_cron_next_run"
92+
.parse()
93+
.expect("_cron_next_run is not a valid system table name")
94+
});
95+
96+
pub static CRON_NEXT_RUN_INDEX_BY_NEXT_TS: LazyLock<IndexName> =
97+
LazyLock::new(|| system_index(&CRON_NEXT_RUN_TABLE, "by_next_ts"));
98+
pub static CRON_NEXT_RUN_INDEX_BY_CRON_JOB_ID: LazyLock<IndexName> =
99+
LazyLock::new(|| system_index(&CRON_NEXT_RUN_TABLE, "by_cron_job_id"));
100+
static CRON_NEXT_RUN_NEXT_TS_FIELD: LazyLock<FieldPath> =
101+
LazyLock::new(|| "nextTs".parse().expect("invalid nextTs field"));
102+
static CRON_NEXT_RUN_CRON_JOB_ID_FIELD: LazyLock<FieldPath> =
103+
LazyLock::new(|| "cronJobId".parse().expect("invalid cronJobId field"));
104+
88105
pub struct CronJobsTable;
89106
impl SystemTable for CronJobsTable {
90107
fn table_name(&self) -> &'static TableName {
@@ -134,6 +151,34 @@ impl SystemTable for CronJobLogsTable {
134151
}
135152
}
136153

154+
pub struct CronNextRunTable;
155+
impl SystemTable for CronNextRunTable {
156+
fn table_name(&self) -> &'static TableName {
157+
&CRON_NEXT_RUN_TABLE
158+
}
159+
160+
fn indexes(&self) -> Vec<SystemIndex> {
161+
vec![
162+
SystemIndex {
163+
name: CRON_NEXT_RUN_INDEX_BY_NEXT_TS.clone(),
164+
fields: vec![CRON_NEXT_RUN_NEXT_TS_FIELD.clone()]
165+
.try_into()
166+
.unwrap(),
167+
},
168+
SystemIndex {
169+
name: CRON_NEXT_RUN_INDEX_BY_CRON_JOB_ID.clone(),
170+
fields: vec![CRON_NEXT_RUN_CRON_JOB_ID_FIELD.clone()]
171+
.try_into()
172+
.unwrap(),
173+
},
174+
]
175+
}
176+
177+
fn validate_document(&self, document: ResolvedDocument) -> anyhow::Result<()> {
178+
ParseDocument::<CronNextRun>::parse(document).map(|_| ())
179+
}
180+
}
181+
137182
const MAX_LOGS_PER_CRON: usize = 5;
138183

139184
pub struct CronModel<'a, RT: Runtime> {
@@ -199,19 +244,51 @@ impl<'a, RT: Runtime> CronModel<'a, RT> {
199244
cron_spec: CronSpec,
200245
) -> anyhow::Result<()> {
201246
let now = self.runtime().generate_timestamp()?;
247+
let next_ts = compute_next_ts(&cron_spec, None, now)?;
202248
let cron = CronJob {
203249
name,
204-
next_ts: compute_next_ts(&cron_spec, None, now)?,
250+
next_ts,
205251
cron_spec,
206252
state: CronJobState::Pending,
207253
prev_ts: None,
208254
};
209-
SystemMetadataModel::new(self.tx, self.component.into())
255+
256+
let cron_job_id = SystemMetadataModel::new(self.tx, self.component.into())
210257
.insert(&CRON_JOBS_TABLE, cron.try_into()?)
258+
.await?
259+
.developer_id;
260+
261+
let next_run = CronNextRun {
262+
cron_job_id,
263+
state: CronJobState::Pending,
264+
prev_ts: None,
265+
next_ts,
266+
};
267+
268+
SystemMetadataModel::new(self.tx, self.component.into())
269+
.insert(&CRON_NEXT_RUN_TABLE, next_run.try_into()?)
211270
.await?;
271+
212272
Ok(())
213273
}
214274

275+
async fn next_run(
276+
&mut self,
277+
cron_job_id: DeveloperDocumentId,
278+
) -> anyhow::Result<Option<ParsedDocument<CronNextRun>>> {
279+
let query = Query::index_range(IndexRange {
280+
index_name: CRON_NEXT_RUN_INDEX_BY_CRON_JOB_ID.clone(),
281+
range: vec![IndexRangeExpression::Eq(
282+
CRON_NEXT_RUN_CRON_JOB_ID_FIELD.clone(),
283+
ConvexValue::from(cron_job_id).into(),
284+
)],
285+
order: Order::Asc,
286+
});
287+
let mut query_stream = ResolvedQuery::new(self.tx, self.component.into(), query)?;
288+
let next_run = query_stream.expect_at_most_one(self.tx).await?;
289+
next_run.map(|v| v.parse()).transpose()
290+
}
291+
215292
pub async fn update(
216293
&mut self,
217294
cron_job: ParsedDocument<CronJob>,
@@ -228,9 +305,16 @@ impl<'a, RT: Runtime> CronModel<'a, RT> {
228305
}
229306

230307
pub async fn delete(&mut self, cron_job: ParsedDocument<CronJob>) -> anyhow::Result<()> {
308+
let id = cron_job.id();
231309
SystemMetadataModel::new(self.tx, self.component.into())
232-
.delete(cron_job.clone().id())
310+
.delete(id)
233311
.await?;
312+
let next_run = self.next_run(id.developer_id).await?;
313+
if let Some(next_run) = next_run {
314+
SystemMetadataModel::new(self.tx, self.component.into())
315+
.delete(next_run.id())
316+
.await?;
317+
}
234318
self.apply_job_log_retention(cron_job.name.clone(), 0)
235319
.await?;
236320
Ok(())
@@ -247,8 +331,25 @@ impl<'a, RT: Runtime> CronModel<'a, RT> {
247331
.namespace(self.component.into())
248332
.tablet_matches_name(id.tablet_id, &CRON_JOBS_TABLE));
249333
SystemMetadataModel::new(self.tx, self.component.into())
250-
.replace(id, job.try_into()?)
334+
.replace(id, job.clone().try_into()?)
251335
.await?;
336+
337+
let next_run = CronNextRun {
338+
cron_job_id: id.developer_id,
339+
state: job.state,
340+
prev_ts: None,
341+
next_ts: job.next_ts,
342+
};
343+
let existing_next_run = self.next_run(id.developer_id).await?;
344+
if let Some(existing_next_run) = existing_next_run {
345+
SystemMetadataModel::new(self.tx, self.component.into())
346+
.replace(existing_next_run.id(), next_run.try_into()?)
347+
.await?;
348+
} else {
349+
SystemMetadataModel::new(self.tx, self.component.into())
350+
.insert(&CRON_NEXT_RUN_TABLE, next_run.try_into()?)
351+
.await?;
352+
}
252353
Ok(())
253354
}
254355

crates/model/src/cron_jobs/types.rs

+46
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use sync_types::{
2727
use value::{
2828
codegen_convex_serialization,
2929
heap_size::HeapSize,
30+
id_v6::DeveloperDocumentId,
3031
json_deserialize,
3132
obj,
3233
ConvexArray,
@@ -1084,3 +1085,48 @@ mod tests {
10841085
assert_roundtrips::<_, CronJob>(cron_job_obj);
10851086
}
10861087
}
1088+
1089+
#[derive(Clone, Debug, PartialEq)]
1090+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
1091+
pub struct CronNextRun {
1092+
// Internally tracked metadata to execute the current run of the cron
1093+
pub cron_job_id: DeveloperDocumentId,
1094+
pub state: CronJobState,
1095+
pub prev_ts: Option<Timestamp>,
1096+
pub next_ts: Timestamp,
1097+
}
1098+
1099+
#[derive(Serialize, Deserialize)]
1100+
#[serde(rename_all = "camelCase")]
1101+
pub struct SerializedCronNextRun {
1102+
cron_job_id: String,
1103+
state: CronJobState,
1104+
prev_ts: Option<i64>,
1105+
next_ts: i64,
1106+
}
1107+
1108+
impl From<CronNextRun> for SerializedCronNextRun {
1109+
fn from(run: CronNextRun) -> Self {
1110+
Self {
1111+
state: run.state,
1112+
prev_ts: run.prev_ts.map(|ts| ts.into()),
1113+
next_ts: run.next_ts.into(),
1114+
cron_job_id: run.cron_job_id.encode(),
1115+
}
1116+
}
1117+
}
1118+
1119+
impl TryFrom<SerializedCronNextRun> for CronNextRun {
1120+
type Error = anyhow::Error;
1121+
1122+
fn try_from(value: SerializedCronNextRun) -> anyhow::Result<Self, Self::Error> {
1123+
Ok(Self {
1124+
cron_job_id: DeveloperDocumentId::decode(&value.cron_job_id)?,
1125+
state: value.state,
1126+
prev_ts: value.prev_ts.map(|ts| ts.try_into()).transpose()?,
1127+
next_ts: value.next_ts.try_into()?,
1128+
})
1129+
}
1130+
}
1131+
1132+
codegen_convex_serialization!(CronNextRun, SerializedCronNextRun);

crates/model/src/lib.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ use cron_jobs::{
8080
CRON_JOBS_TABLE,
8181
CRON_JOB_LOGS_INDEX_BY_NAME_TS,
8282
CRON_JOB_LOGS_TABLE,
83+
CRON_NEXT_RUN_INDEX_BY_CRON_JOB_ID,
84+
CRON_NEXT_RUN_INDEX_BY_NEXT_TS,
85+
CRON_NEXT_RUN_TABLE,
8386
};
8487
pub use database::defaults::{
8588
SystemIndex,
@@ -179,6 +182,7 @@ use crate::{
179182
cron_jobs::{
180183
CronJobLogsTable,
181184
CronJobsTable,
185+
CronNextRunTable,
182186
},
183187
deployment_audit_log::{
184188
DeploymentAuditLogsTable,
@@ -252,9 +256,10 @@ enum DefaultTableNumber {
252256
ComponentsTable = 32,
253257
FunctionHandlesTable = 33,
254258
CanonicalUrls = 34,
259+
CronNextRun = 35,
255260
// Keep this number and your user name up to date. The number makes it easy to know
256261
// what to use next. The username on the same line detects merge conflicts
257-
// Next Number - 35 - lee
262+
// Next Number - 36 - nipunn
258263
}
259264

260265
impl From<DefaultTableNumber> for TableNumber {
@@ -295,6 +300,7 @@ impl From<DefaultTableNumber> for &'static dyn SystemTable {
295300
DefaultTableNumber::ComponentsTable => &ComponentsTable,
296301
DefaultTableNumber::FunctionHandlesTable => &FunctionHandlesTable,
297302
DefaultTableNumber::CanonicalUrls => &CanonicalUrlsTable,
303+
DefaultTableNumber::CronNextRun => &CronNextRunTable,
298304
}
299305
}
300306
}
@@ -326,6 +332,8 @@ static SYSTEM_INDEXES_WITHOUT_CREATION_TIME: LazyLock<BTreeSet<IndexName>> = Laz
326332
CRON_JOBS_INDEX_BY_NAME.clone(),
327333
CRON_JOBS_INDEX_BY_NEXT_TS.clone(),
328334
CRON_JOB_LOGS_INDEX_BY_NAME_TS.clone(),
335+
CRON_NEXT_RUN_INDEX_BY_NEXT_TS.clone(),
336+
CRON_NEXT_RUN_INDEX_BY_CRON_JOB_ID.clone(),
329337
ENVIRONMENT_VARIABLES_INDEX_BY_NAME.clone(),
330338
EXPORTS_BY_STATE_AND_TS_INDEX.clone(),
331339
FILE_STORAGE_ID_INDEX.clone(),
@@ -517,6 +525,7 @@ pub fn component_system_tables() -> Vec<&'static dyn SystemTable> {
517525
&ScheduledJobsTable,
518526
&CronJobsTable,
519527
&CronJobLogsTable,
528+
&CronNextRunTable,
520529
&ModulesTable,
521530
&UdfConfigTable,
522531
&SourcePackagesTable,
@@ -531,6 +540,7 @@ static APP_TABLES_TO_LOAD_IN_MEMORY: LazyLock<BTreeSet<TableName>> = LazyLock::n
531540
MODULES_TABLE.clone(),
532541
ENVIRONMENT_VARIABLES_TABLE.clone(),
533542
CRON_JOBS_TABLE.clone(),
543+
CRON_NEXT_RUN_TABLE.clone(),
534544
BACKEND_STATE_TABLE.clone(),
535545
CANONICAL_URLS_TABLE.clone(),
536546
BACKEND_INFO_TABLE.clone(),
@@ -570,6 +580,7 @@ pub static FIRST_SEEN_TABLE: LazyLock<BTreeMap<TableName, DatabaseVersion>> = La
570580
BACKEND_STATE_TABLE.clone() => 75,
571581
CRON_JOBS_TABLE.clone() => 47,
572582
CRON_JOB_LOGS_TABLE.clone() => 51,
583+
CRON_NEXT_RUN_TABLE.clone() => 118,
573584
SOURCE_PACKAGES_TABLE.clone() => 44,
574585
ENVIRONMENT_VARIABLES_TABLE.clone() => 44,
575586
AWS_LAMBDA_VERSIONS_TABLE.clone() => 44,
@@ -599,6 +610,8 @@ pub static FIRST_SEEN_INDEX: LazyLock<BTreeMap<IndexName, DatabaseVersion>> = La
599610
CRON_JOBS_INDEX_BY_NEXT_TS.clone() => 47,
600611
CRON_JOBS_INDEX_BY_NAME.clone() => 49,
601612
CRON_JOB_LOGS_INDEX_BY_NAME_TS.clone() => 51,
613+
CRON_NEXT_RUN_INDEX_BY_NEXT_TS.clone() => 118,
614+
CRON_NEXT_RUN_INDEX_BY_CRON_JOB_ID.clone() => 118,
602615
EXPORTS_BY_STATE_AND_TS_INDEX.clone() => 88,
603616
TABLES_INDEX.clone() => 44,
604617
SCHEMAS_STATE_INDEX.clone() => 44,

crates/model/src/migrations.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl fmt::Display for MigrationCompletionCriterion {
8181
// migrations unless explicitly dropping support.
8282
// Add a user name next to the version when you make a change to highlight merge
8383
// conflicts.
84-
pub const DATABASE_VERSION: DatabaseVersion = 117; // nipunn
84+
pub const DATABASE_VERSION: DatabaseVersion = 118; // nipunn
8585

8686
pub struct MigrationWorker<RT: Runtime> {
8787
rt: RT,
@@ -392,6 +392,8 @@ impl<RT: Runtime> MigrationWorker<RT> {
392392
.await?;
393393
MigrationCompletionCriterion::MigrationComplete(to_version)
394394
},
395+
// Empty migration for 118 - represents creation of CronNextRun table
396+
118 => MigrationCompletionCriterion::MigrationComplete(to_version),
395397
// NOTE: Make sure to increase DATABASE_VERSION when adding new migrations.
396398
_ => anyhow::bail!("Version did not define a migration! {}", to_version),
397399
};

0 commit comments

Comments
 (0)