Skip to content

Commit ffa61f9

Browse files
goffrieConvex, Inc.
authored and
Convex, Inc.
committed
Move ParsedDocument parsing to its own trait (#35690)
GitOrigin-RevId: 3a6196e1f1942cf3f05b9ce80ff2569e9a38b833
1 parent 70a44d3 commit ffa61f9

File tree

48 files changed

+195
-141
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+195
-141
lines changed

crates/application/src/cron_jobs/mod.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ use common::{
1515
ComponentPath,
1616
PublicFunctionPath,
1717
},
18-
document::ParsedDocument,
18+
document::{
19+
ParseDocument,
20+
ParsedDocument,
21+
},
1922
errors::{
2023
report_error,
2124
JsError,
@@ -279,15 +282,15 @@ impl<RT: Runtime> CronJobExecutor<RT> {
279282
for namespace in namespaces {
280283
let mut query = ResolvedQuery::new(tx, namespace, index_query.clone())?;
281284
if let Some(doc) = query.next(tx, None).await? {
282-
let job: ParsedDocument<CronJob> = doc.try_into()?;
285+
let job: ParsedDocument<CronJob> = doc.parse()?;
283286
let next_ts = job.next_ts;
284287
queries.insert((next_ts, namespace), (job, query));
285288
}
286289
}
287290
while let Some(((_min_next_ts, namespace), (min_job, mut query))) = queries.pop_first() {
288291
yield min_job;
289292
if let Some(doc) = query.next(tx, None).await? {
290-
let job: ParsedDocument<CronJob> = doc.try_into()?;
293+
let job: ParsedDocument<CronJob> = doc.parse()?;
291294
let next_ts = job.next_ts;
292295
queries.insert((next_ts, namespace), (job, query));
293296
}
@@ -702,7 +705,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
702705
let new_job = tx
703706
.get(job_id)
704707
.await?
705-
.map(ParsedDocument::<CronJob>::try_from)
708+
.map(ParseDocument::<CronJob>::parse)
706709
.transpose()?
707710
.map(|j| j.into_value());
708711
Ok((new_job.as_ref() == Some(expected_state)).then_some(tx))

crates/application/src/exports/export_storage.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use anyhow::Context;
44
use bytes::Bytes;
55
use common::{
66
components::ComponentPath,
7-
document::ParsedDocument,
7+
document::ParseDocument,
88
knobs::{
99
EXPORT_MAX_INFLIGHT_PREFETCH_BYTES,
1010
EXPORT_STORAGE_GET_CONCURRENCY,
@@ -86,7 +86,7 @@ pub async fn write_storage_table<'a, 'b: 'a, RT: Runtime>(
8686
let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None);
8787
pin_mut!(stream);
8888
while let Some(LatestDocument { value: doc, .. }) = stream.try_next().await? {
89-
let file_storage_entry = ParsedDocument::<FileStorageEntry>::try_from(doc)?;
89+
let file_storage_entry = ParseDocument::<FileStorageEntry>::parse(doc)?;
9090
let virtual_storage_id = file_storage_entry.id().developer_id;
9191
let creation_time = f64::from(file_storage_entry.creation_time());
9292
table_upload
@@ -108,7 +108,7 @@ pub async fn write_storage_table<'a, 'b: 'a, RT: Runtime>(
108108
let files_stream = table_iterator
109109
.stream_documents_in_table(*tablet_id, *by_id, None)
110110
.map_ok(|LatestDocument { value: doc, .. }| async {
111-
let file_storage_entry = ParsedDocument::<FileStorageEntry>::try_from(doc)?;
111+
let file_storage_entry = ParseDocument::<FileStorageEntry>::parse(doc)?;
112112
let virtual_storage_id = file_storage_entry.id().developer_id;
113113
// Add an extension, which isn't necessary for anything and might be incorrect,
114114
// but allows the file to be viewed at a glance in most cases.

crates/application/src/exports/tests.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ use common::{
1616
ComponentId,
1717
ComponentPath,
1818
},
19-
document::ParsedDocument,
19+
document::{
20+
ParseDocument,
21+
ParsedDocument,
22+
},
2023
types::{
2124
ConvexOrigin,
2225
TableName,
@@ -419,7 +422,7 @@ async fn test_export_storage(rt: TestRuntime) -> anyhow::Result<()> {
419422
))
420423
.await?
421424
.unwrap()
422-
.try_into()?;
425+
.parse()?;
423426

424427
expected_export_entries.insert(format!("_storage/{file1_id}.jpeg"), format!("abc"));
425428
expected_export_entries.insert(

crates/application/src/exports/worker.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use common::{
88
self,
99
backoff::Backoff,
1010
components::ComponentPath,
11-
document::ParsedDocument,
11+
document::{
12+
ParseDocument,
13+
ParsedDocument,
14+
},
1215
errors::report_error,
1316
execution_context::ExecutionId,
1417
runtime::Runtime,
@@ -143,7 +146,7 @@ impl<RT: Runtime> ExportWorker<RT> {
143146
in_progress_export.clone().try_into()?,
144147
)
145148
.await?
146-
.try_into()?;
149+
.parse()?;
147150
self.database
148151
.commit_with_write_source(tx, "export_worker_export_requested")
149152
.await?;
@@ -216,7 +219,7 @@ impl<RT: Runtime> ExportWorker<RT> {
216219
let msg = msg.clone();
217220
async move {
218221
let export: ParsedDocument<Export> =
219-
tx.get(id).await?.context(ExportCanceled)?.try_into()?;
222+
tx.get(id).await?.context(ExportCanceled)?.parse()?;
220223
let export = export.into_value();
221224
if let Export::Canceled { .. } = export {
222225
anyhow::bail!(ExportCanceled);
@@ -245,7 +248,7 @@ impl<RT: Runtime> ExportWorker<RT> {
245248
tracing::warn!("Export {id} disappeared");
246249
return Err(ExportCanceled.into());
247250
};
248-
let export: ParsedDocument<Export> = export.try_into()?;
251+
let export: ParsedDocument<Export> = export.parse()?;
249252
match *export {
250253
Export::InProgress { .. } => (),
251254
Export::Canceled { .. } => return Err(ExportCanceled.into()),
@@ -282,7 +285,7 @@ impl<RT: Runtime> ExportWorker<RT> {
282285
tracing::warn!("Export {id} disappeared");
283286
return Err(ExportCanceled.into());
284287
};
285-
let export: ParsedDocument<Export> = export.try_into()?;
288+
let export: ParsedDocument<Export> = export.parse()?;
286289
if let Export::Canceled { .. } = *export {
287290
return Err(ExportCanceled.into());
288291
}

crates/application/src/scheduled_jobs/mod.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ use common::{
1818
ComponentId,
1919
PublicFunctionPath,
2020
},
21-
document::ParsedDocument,
21+
document::{
22+
ParseDocument,
23+
ParsedDocument,
24+
},
2225
errors::{
2326
report_error,
2427
JsError,
@@ -369,7 +372,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
369372
for namespace in namespaces {
370373
let mut query = ResolvedQuery::new(tx, namespace, index_query.clone())?;
371374
if let Some(doc) = query.next(tx, None).await? {
372-
let job: ParsedDocument<ScheduledJob> = doc.try_into()?;
375+
let job: ParsedDocument<ScheduledJob> = doc.parse()?;
373376
let next_ts = job.next_ts.ok_or_else(|| {
374377
anyhow::anyhow!("Could not get next_ts to run scheduled job {}", job.id())
375378
})?;
@@ -379,7 +382,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
379382
while let Some(((_min_next_ts, namespace), (min_job, mut query))) = queries.pop_first() {
380383
yield min_job;
381384
if let Some(doc) = query.next(tx, None).await? {
382-
let job: ParsedDocument<ScheduledJob> = doc.try_into()?;
385+
let job: ParsedDocument<ScheduledJob> = doc.parse()?;
383386
let next_ts = job.next_ts.ok_or_else(|| {
384387
anyhow::anyhow!("Could not get next_ts to run scheduled job {}", job.id())
385388
})?;
@@ -818,7 +821,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
818821
let new_job = tx
819822
.get(job_id)
820823
.await?
821-
.map(ParsedDocument::<ScheduledJob>::try_from)
824+
.map(ParseDocument::<ScheduledJob>::parse)
822825
.transpose()?
823826
.map(|j| j.into_value());
824827
Ok((new_job.as_ref() == Some(expected_state), tx))
@@ -901,7 +904,7 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
901904

902905
let mut jobs_to_delete = vec![];
903906
while let Some(doc) = query_stream.next(&mut tx, None).await? {
904-
let job: ParsedDocument<ScheduledJob> = doc.try_into()?;
907+
let job: ParsedDocument<ScheduledJob> = doc.parse()?;
905908
match job.state {
906909
ScheduledJobState::Success => (),
907910
ScheduledJobState::Failed(_) => (),

crates/application/src/system_table_cleanup/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use common::{
1212
components::ComponentId,
1313
document::{
1414
CreationTime,
15+
ParseDocument,
1516
ParsedDocument,
1617
CREATION_TIME_FIELD_PATH,
1718
},
@@ -169,7 +170,7 @@ impl<RT: Runtime> SystemTableCleanupWorker<RT> {
169170
database::query::TableFilter::IncludePrivateSystemTables,
170171
)?;
171172
}
172-
let table: ParsedDocument<TableMetadata> = document.try_into()?;
173+
let table: ParsedDocument<TableMetadata> = document.parse()?;
173174
match table.state {
174175
TableState::Active | TableState::Deleting => {},
175176
TableState::Hidden => {

crates/common/src/bootstrap_model/index/index_metadata.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::{
3636
TextIndexState,
3737
},
3838
document::{
39+
ParseDocument,
3940
ParsedDocument,
4041
ResolvedDocument,
4142
},
@@ -165,7 +166,7 @@ impl<T: IndexTableIdentifier> IndexMetadata<T> {
165166

166167
impl TabletIndexMetadata {
167168
pub fn from_document(document: ResolvedDocument) -> anyhow::Result<ParsedDocument<Self>> {
168-
document.try_into()
169+
document.parse()
169170
}
170171
}
171172

crates/common/src/document.rs

+10-8
Original file line numberDiff line numberDiff line change
@@ -874,22 +874,24 @@ impl<D> ParsedDocument<D> {
874874
}
875875
}
876876

877-
impl<D> TryFrom<ResolvedDocument> for ParsedDocument<D>
877+
pub trait ParseDocument<D> {
878+
fn parse(self) -> anyhow::Result<ParsedDocument<D>>;
879+
}
880+
881+
impl<D> ParseDocument<D> for ResolvedDocument
878882
where
879883
D: TryFrom<ConvexObject, Error = anyhow::Error>,
880884
{
881-
type Error = anyhow::Error;
882-
883-
fn try_from(document: ResolvedDocument) -> anyhow::Result<Self> {
884-
let id = document.id();
885-
let creation_time = document.creation_time;
886-
let value: D = document
885+
fn parse(self) -> anyhow::Result<ParsedDocument<D>> {
886+
let id = self.id();
887+
let creation_time = self.creation_time;
888+
let value: D = self
887889
.document
888890
.into_value()
889891
.0
890892
.try_into()
891893
.with_context(|| format!("Failed to parse document id: {id}"))?;
892-
Ok(Self {
894+
Ok(ParsedDocument {
893895
id,
894896
creation_time,
895897
value,

crates/database/src/bootstrap_model/components/definition.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::LazyLock;
33
use common::{
44
bootstrap_model::components::definition::ComponentDefinitionMetadata,
55
document::{
6-
ParsedDocument,
6+
ParseDocument,
77
ResolvedDocument,
88
},
99
};
@@ -32,7 +32,7 @@ impl SystemTable for ComponentDefinitionsTable {
3232
}
3333

3434
fn validate_document(&self, document: ResolvedDocument) -> anyhow::Result<()> {
35-
ParsedDocument::<ComponentDefinitionMetadata>::try_from(document)?;
35+
ParseDocument::<ComponentDefinitionMetadata>::parse(document)?;
3636
Ok(())
3737
}
3838
}

crates/database/src/bootstrap_model/components/mod.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use common::{
2626
Resource,
2727
},
2828
document::{
29+
ParseDocument,
2930
ParsedDocument,
3031
ResolvedDocument,
3132
},
@@ -87,7 +88,7 @@ impl SystemTable for ComponentsTable {
8788
}
8889

8990
fn validate_document(&self, document: ResolvedDocument) -> anyhow::Result<()> {
90-
ParsedDocument::<ComponentMetadata>::try_from(document)?;
91+
ParseDocument::<ComponentMetadata>::parse(document)?;
9192
Ok(())
9293
}
9394
}
@@ -149,7 +150,7 @@ impl<'a, RT: Runtime> BootstrapComponentsModel<'a, RT> {
149150
)?;
150151
let mut components = Vec::new();
151152
while let Some(doc) = query.next(self.tx, None).await? {
152-
components.push(doc.try_into()?);
153+
components.push(doc.parse()?);
153154
}
154155
Ok(components)
155156
}
@@ -209,7 +210,7 @@ impl<'a, RT: Runtime> BootstrapComponentsModel<'a, RT> {
209210
.get(component_doc_id)
210211
.await?
211212
.context("component missing")?
212-
.try_into()?;
213+
.parse()?;
213214
ComponentDefinitionId::Child(component_doc.definition_id)
214215
},
215216
};
@@ -227,7 +228,7 @@ impl<'a, RT: Runtime> BootstrapComponentsModel<'a, RT> {
227228
self.tx
228229
.get(component_doc_id)
229230
.await?
230-
.map(TryInto::try_into)
231+
.map(ParseDocument::parse)
231232
.transpose()?
232233
},
233234
};
@@ -296,7 +297,7 @@ impl<'a, RT: Runtime> BootstrapComponentsModel<'a, RT> {
296297
let Some(doc) = self.tx.get(component_definition_doc_id).await? else {
297298
return Ok(None);
298299
};
299-
let mut doc: ParsedDocument<ComponentDefinitionMetadata> = doc.try_into()?;
300+
let mut doc: ParsedDocument<ComponentDefinitionMetadata> = doc.parse()?;
300301
if !doc.exports.is_empty() {
301302
metrics::log_nonempty_component_exports();
302303
doc.exports = BTreeMap::new();
@@ -344,7 +345,7 @@ impl<'a, RT: Runtime> BootstrapComponentsModel<'a, RT> {
344345
)?;
345346
let mut definitions = BTreeMap::new();
346347
while let Some(doc) = query.next(self.tx, None).await? {
347-
let mut definition: ParsedDocument<ComponentDefinitionMetadata> = doc.try_into()?;
348+
let mut definition: ParsedDocument<ComponentDefinitionMetadata> = doc.parse()?;
348349
if !definition.exports.is_empty() {
349350
metrics::log_nonempty_component_exports();
350351
definition.exports = BTreeMap::new();

crates/database/src/bootstrap_model/index.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use common::{
2929
INDEX_TABLE,
3030
},
3131
document::{
32+
ParseDocument,
3233
ParsedDocument,
3334
ResolvedDocument,
3435
},
@@ -95,7 +96,7 @@ impl SystemTable for IndexTable {
9596
}
9697

9798
fn validate_document(&self, document: ResolvedDocument) -> anyhow::Result<()> {
98-
ParsedDocument::<TabletIndexMetadata>::try_from(document).map(|_| ())
99+
ParseDocument::<TabletIndexMetadata>::parse(document).map(|_| ())
99100
}
100101
}
101102

crates/database/src/bootstrap_model/index_workers/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77

88
use common::{
99
document::{
10+
ParseDocument,
1011
ParsedDocument,
1112
ResolvedDocument,
1213
},
@@ -108,7 +109,7 @@ impl<'a, RT: Runtime> IndexWorkerMetadataModel<'a, RT> {
108109
let mut query_stream = ResolvedQuery::new(self.tx, TableNamespace::Global, query)?;
109110
let result = query_stream.next(self.tx, None).await?;
110111
result
111-
.map(ParsedDocument::<IndexWorkerMetadataRecord>::try_from)
112+
.map(ParseDocument::<IndexWorkerMetadataRecord>::parse)
112113
.transpose()
113114
}
114115

@@ -135,7 +136,7 @@ impl<'a, RT: Runtime> IndexWorkerMetadataModel<'a, RT> {
135136
let id = SystemMetadataModel::new_global(self.tx)
136137
.insert(&INDEX_WORKER_METADATA_TABLE, metadata.try_into()?)
137138
.await?;
138-
ParsedDocument::try_from(self.tx.get(id).await?.unwrap())
139+
ParseDocument::parse(self.tx.get(id).await?.unwrap())
139140
}
140141
}
141142

@@ -165,7 +166,7 @@ impl SystemTable for IndexWorkerMetadataTable {
165166
}
166167

167168
fn validate_document(&self, document: ResolvedDocument) -> anyhow::Result<()> {
168-
ParsedDocument::<IndexWorkerMetadataRecord>::try_from(document).map(|_| ())
169+
ParseDocument::<IndexWorkerMetadataRecord>::parse(document).map(|_| ())
169170
}
170171
}
171172

0 commit comments

Comments
 (0)