Skip to content

Commit 95dd22c

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
retention look up by exact prev_ts, checking ts against retention (#34449)
Redo of #34317 . Instead of querying in the documents log for exactly `(id, prev_ts)`, we change the interface to take in an `id, ts, prev_ts`. Then we can compare the `ts` against retention and use `prev_ts` for the direct equality lookup. And we can return a map that uses the entire `id, ts, prev_ts` as a key, to avoid ambiguity of which timestamp is used in the key. GitOrigin-RevId: 1e8ac19ad499ad4751820b92cc2580b9bc433c78
1 parent 82acc05 commit 95dd22c

File tree

10 files changed

+226
-105
lines changed

10 files changed

+226
-105
lines changed

crates/common/src/persistence.rs

+19-12
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,13 @@ pub trait RetentionValidator: Sync + Send {
312312
fn fail_if_falling_behind(&self) -> anyhow::Result<()>;
313313
}
314314

315+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
316+
pub struct DocumentPrevTsQuery {
317+
pub id: InternalDocumentId,
318+
pub ts: Timestamp,
319+
pub prev_ts: Timestamp,
320+
}
321+
315322
#[async_trait]
316323
pub trait PersistenceReader: Send + Sync + 'static {
317324
/// The persistence is required to load documents within the given timestamp
@@ -359,14 +366,14 @@ pub trait PersistenceReader: Send + Sync + 'static {
359366
retention_validator: Arc<dyn RetentionValidator>,
360367
) -> anyhow::Result<BTreeMap<(InternalDocumentId, Timestamp), DocumentLogEntry>>;
361368

362-
/// Look up documents at exactly the specified timestamps, returning a map
363-
/// where for each `(id, ts)` we have an entry only if a document exists
364-
/// at exactly that timestamp.
365-
async fn documents_multiget(
369+
/// Look up documents at exactly the specified prev_ts timestamps, returning
370+
/// a map where for each `DocumentPrevTsQuery` we have an entry only if
371+
/// a document exists at `(id, prev_ts)`.
372+
async fn previous_revisions_of_documents(
366373
&self,
367-
ids: BTreeSet<(InternalDocumentId, Timestamp)>,
374+
ids: BTreeSet<DocumentPrevTsQuery>,
368375
retention_validator: Arc<dyn RetentionValidator>,
369-
) -> anyhow::Result<BTreeMap<(InternalDocumentId, Timestamp), DocumentLogEntry>>;
376+
) -> anyhow::Result<BTreeMap<DocumentPrevTsQuery, DocumentLogEntry>>;
370377

371378
/// Loads documentIds with respective timestamps that match the
372379
/// index query criteria.
@@ -586,16 +593,16 @@ impl RepeatablePersistence {
586593
.await
587594
}
588595

589-
pub async fn documents_multiget(
596+
pub async fn previous_revisions_of_documents(
590597
&self,
591-
ids: BTreeSet<(InternalDocumentId, Timestamp)>,
592-
) -> anyhow::Result<BTreeMap<(InternalDocumentId, Timestamp), DocumentLogEntry>> {
593-
for (_, ts) in &ids {
594-
// Reading documents ==ts, so ts needs to be repeatable.
598+
ids: BTreeSet<DocumentPrevTsQuery>,
599+
) -> anyhow::Result<BTreeMap<DocumentPrevTsQuery, DocumentLogEntry>> {
600+
for DocumentPrevTsQuery { ts, .. } in &ids {
601+
// Reading previous revisions of documents at ts, so ts needs to be repeatable.
595602
anyhow::ensure!(*ts <= self.upper_bound);
596603
}
597604
self.reader
598-
.documents_multiget(ids, self.retention_validator.clone())
605+
.previous_revisions_of_documents(ids, self.retention_validator.clone())
599606
.await
600607
}
601608

crates/common/src/testing/persistence_test_suite.rs

+87-24
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use crate::{
5353
fake_retention_validator::FakeRetentionValidator,
5454
ConflictStrategy,
5555
DocumentLogEntry,
56+
DocumentPrevTsQuery,
5657
LatestDocument,
5758
NoopRetentionValidator,
5859
Persistence,
@@ -221,10 +222,13 @@ macro_rules! run_persistence_test_suite {
221222
}
222223

223224
#[tokio::test]
224-
async fn test_persistence_documents_multiget() -> anyhow::Result<()> {
225+
async fn test_persistence_previous_revisions_of_documents() -> anyhow::Result<()> {
225226
let $db = $create_db;
226227
let p = $create_persistence;
227-
persistence_test_suite::persistence_documents_multiget(::std::sync::Arc::new(p)).await
228+
persistence_test_suite::persistence_previous_revisions_of_documents(
229+
::std::sync::Arc::new(p),
230+
)
231+
.await
228232
}
229233

230234
#[tokio::test]
@@ -1756,7 +1760,9 @@ pub async fn persistence_delete_documents<P: Persistence>(p: Arc<P>) -> anyhow::
17561760
Ok(())
17571761
}
17581762

1759-
pub async fn persistence_documents_multiget<P: Persistence>(p: Arc<P>) -> anyhow::Result<()> {
1763+
pub async fn persistence_previous_revisions_of_documents<P: Persistence>(
1764+
p: Arc<P>,
1765+
) -> anyhow::Result<()> {
17601766
let mut id_generator = TestIdGenerator::new();
17611767
let table: TableName = str::parse("table")?;
17621768
let id1 = id_generator.user_generate(&table);
@@ -1806,38 +1812,87 @@ pub async fn persistence_documents_multiget<P: Persistence>(p: Arc<P>) -> anyhow
18061812
id_generator.generate_internal(),
18071813
);
18081814

1815+
// For the purposes of testing, set `ts` to be anything, because only `prev_ts`
1816+
// is used.
18091817
let queries = btreeset![
18101818
// Latest revision
1811-
(id1.into(), Timestamp::must(3)),
1812-
// Non-latest revision
1813-
(id1.into(), Timestamp::must(1)),
1814-
// Tombstone
1815-
(id2.into(), Timestamp::must(2)),
1816-
// Nonexistent revision
1817-
(id2.into(), Timestamp::must(3)),
1819+
DocumentPrevTsQuery {
1820+
id: id1.into(),
1821+
ts: Timestamp::must(4),
1822+
prev_ts: Timestamp::must(3),
1823+
},
1824+
// Previous revision of latest revision
1825+
DocumentPrevTsQuery {
1826+
id: id1.into(),
1827+
ts: Timestamp::must(3),
1828+
prev_ts: Timestamp::must(1)
1829+
},
1830+
// Tombstone (in this case ts doesn't actually exist but it's fine)
1831+
DocumentPrevTsQuery {
1832+
id: id2.into(),
1833+
ts: Timestamp::must(3),
1834+
prev_ts: Timestamp::must(2)
1835+
},
1836+
// Nonexistent revision at both ts and prev_ts
1837+
DocumentPrevTsQuery {
1838+
id: id2.into(),
1839+
ts: Timestamp::must(4),
1840+
prev_ts: Timestamp::must(3)
1841+
},
18181842
// Unchanged document
1819-
(id3.into(), Timestamp::must(1)),
1843+
DocumentPrevTsQuery {
1844+
id: id3.into(),
1845+
ts: Timestamp::must(2),
1846+
prev_ts: Timestamp::must(1),
1847+
},
18201848
// Nonexistent document
1821-
(nonexistent_id, Timestamp::must(1))
1849+
DocumentPrevTsQuery {
1850+
id: nonexistent_id,
1851+
ts: Timestamp::must(2),
1852+
prev_ts: Timestamp::must(1),
1853+
},
18221854
];
18231855

18241856
// Test with NoopRetentionValidator
18251857
// Note: Proper retention validation testing will be added in a separate PR
18261858
let results = p
18271859
.reader()
1828-
.documents_multiget(queries.clone(), Arc::new(NoopRetentionValidator))
1860+
.previous_revisions_of_documents(queries.clone(), Arc::new(NoopRetentionValidator))
18291861
.await?;
18301862

18311863
// Should get exact matches only
18321864
assert_eq!(results.len(), 4); // id1@3, id1@1, id2@2, id3@1
1833-
assert!(results.contains_key(&(id1.into(), Timestamp::must(3))));
1834-
assert!(results.contains_key(&(id1.into(), Timestamp::must(1))));
1835-
assert!(results.contains_key(&(id2.into(), Timestamp::must(2))));
1836-
assert!(results.contains_key(&(id3.into(), Timestamp::must(1))));
1865+
assert!(results.contains_key(&DocumentPrevTsQuery {
1866+
id: id1.into(),
1867+
ts: Timestamp::must(3),
1868+
prev_ts: Timestamp::must(1)
1869+
}));
1870+
assert!(results.contains_key(&DocumentPrevTsQuery {
1871+
id: id2.into(),
1872+
ts: Timestamp::must(3),
1873+
prev_ts: Timestamp::must(2)
1874+
}));
1875+
assert!(results.contains_key(&DocumentPrevTsQuery {
1876+
id: id3.into(),
1877+
ts: Timestamp::must(2),
1878+
prev_ts: Timestamp::must(1)
1879+
}));
18371880

18381881
// Verify document contents
1839-
let id1_at_3 = results.get(&(id1.into(), Timestamp::must(3))).unwrap();
1840-
let id1_at_1 = results.get(&(id1.into(), Timestamp::must(1))).unwrap();
1882+
let id1_at_3 = results
1883+
.get(&DocumentPrevTsQuery {
1884+
id: id1.into(),
1885+
ts: Timestamp::must(4),
1886+
prev_ts: Timestamp::must(3),
1887+
})
1888+
.unwrap();
1889+
let id1_at_1 = results
1890+
.get(&DocumentPrevTsQuery {
1891+
id: id1.into(),
1892+
ts: Timestamp::must(3),
1893+
prev_ts: Timestamp::must(1),
1894+
})
1895+
.unwrap();
18411896

18421897
// Verify id1@3 has the correct document and prev_ts pointing to id1@1
18431898
assert_eq!(id1_at_3.value, Some(doc(id1)));
@@ -1853,7 +1908,11 @@ pub async fn persistence_documents_multiget<P: Persistence>(p: Arc<P>) -> anyhow
18531908
// Verify tombstone
18541909
assert_eq!(
18551910
results
1856-
.get(&(id2.into(), Timestamp::must(2)))
1911+
.get(&DocumentPrevTsQuery {
1912+
id: id2.into(),
1913+
ts: Timestamp::must(3),
1914+
prev_ts: Timestamp::must(2)
1915+
})
18571916
.unwrap()
18581917
.value,
18591918
None
@@ -1862,21 +1921,25 @@ pub async fn persistence_documents_multiget<P: Persistence>(p: Arc<P>) -> anyhow
18621921
let retention_validator = FakeRetentionValidator::new(Timestamp::must(4), Timestamp::must(0));
18631922
// Min ts queried is 1, and min_document_ts is 0, so it's a valid query.
18641923
p.reader()
1865-
.documents_multiget(queries.clone(), Arc::new(retention_validator))
1924+
.previous_revisions_of_documents(queries.clone(), Arc::new(retention_validator))
18661925
.await?;
18671926

18681927
let retention_validator = FakeRetentionValidator::new(Timestamp::must(4), Timestamp::must(4));
18691928
// Min ts queried is 1, and min_document_ts is 4, so it's an invalid query.
18701929
assert!(p
18711930
.reader()
1872-
.documents_multiget(queries, Arc::new(retention_validator))
1931+
.previous_revisions_of_documents(queries, Arc::new(retention_validator))
18731932
.await
18741933
.is_err());
18751934
// Errors even if there is no document at the timestamp.
18761935
assert!(p
18771936
.reader()
1878-
.documents_multiget(
1879-
btreeset![(nonexistent_id, Timestamp::must(1))],
1937+
.previous_revisions_of_documents(
1938+
btreeset![DocumentPrevTsQuery {
1939+
id: nonexistent_id,
1940+
ts: Timestamp::must(1),
1941+
prev_ts: Timestamp::must(1)
1942+
}],
18801943
Arc::new(retention_validator)
18811944
)
18821945
.await

crates/common/src/testing/test_persistence.rs

+10-9
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::{
3939
persistence::{
4040
ConflictStrategy,
4141
DocumentLogEntry,
42+
DocumentPrevTsQuery,
4243
DocumentStream,
4344
IndexStream,
4445
LatestDocument,
@@ -282,25 +283,25 @@ impl PersistenceReader for TestPersistence {
282283
Ok(result)
283284
}
284285

285-
async fn documents_multiget(
286+
async fn previous_revisions_of_documents(
286287
&self,
287-
ids: BTreeSet<(InternalDocumentId, Timestamp)>,
288+
ids: BTreeSet<DocumentPrevTsQuery>,
288289
retention_validator: Arc<dyn RetentionValidator>,
289-
) -> anyhow::Result<BTreeMap<(InternalDocumentId, Timestamp), DocumentLogEntry>> {
290-
let min_ts = ids.iter().map(|(_, ts)| *ts).min();
290+
) -> anyhow::Result<BTreeMap<DocumentPrevTsQuery, DocumentLogEntry>> {
291+
let min_ts = ids.iter().map(|DocumentPrevTsQuery { ts, .. }| *ts).min();
291292
let result = {
292293
let inner = self.inner.lock();
293294
let result = ids
294295
.into_iter()
295-
.filter_map(|(id, ts)| {
296-
inner.log.get(&(ts, id)).map(|(doc, prev_ts)| {
296+
.filter_map(|DocumentPrevTsQuery { id, ts, prev_ts }| {
297+
inner.log.get(&(prev_ts, id)).map(|(doc, prev_prev_ts)| {
297298
(
298-
(id, ts),
299+
DocumentPrevTsQuery { id, ts, prev_ts },
299300
DocumentLogEntry {
300301
id,
301-
ts,
302+
ts: prev_ts,
302303
value: doc.clone(),
303-
prev_ts: *prev_ts,
304+
prev_ts: *prev_prev_ts,
304305
},
305306
)
306307
})

crates/database/src/retention.rs

+25-3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ use common::{
6363
persistence::{
6464
new_static_repeatable_recent,
6565
DocumentLogEntry,
66+
DocumentPrevTsQuery,
6667
NoopRetentionValidator,
6768
Persistence,
6869
PersistenceGlobalKey,
@@ -561,26 +562,47 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
561562
// Each prev rev has 1 or 2 index entries to delete per index -- one entry at
562563
// the prev rev's ts, and a tombstone at the current rev's ts if
563564
// the document was deleted or its index key changed.
564-
// TODO: use prev_ts when available
565565
let prev_revs = reader_
566-
.previous_revisions(chunk.iter().map(|entry| (entry.id, entry.ts)).collect())
566+
.previous_revisions_of_documents(
567+
chunk
568+
.iter()
569+
.filter_map(|entry| {
570+
entry.prev_ts.map(|prev_ts| DocumentPrevTsQuery {
571+
id: entry.id,
572+
ts: entry.ts,
573+
prev_ts,
574+
})
575+
})
576+
.collect(),
577+
)
567578
.await?;
568579
for DocumentLogEntry {
569580
ts,
570581
id,
571582
value: maybe_doc,
583+
prev_ts,
572584
..
573585
} in chunk
574586
{
575587
// If there is no prev rev, there's nothing to delete.
576588
// If this happens for a tombstone, it means the document was created and
577589
// deleted in the same transaction, with no index rows.
590+
let Some(prev_ts) = prev_ts else {
591+
log_retention_scanned_document(maybe_doc.is_none(), false);
592+
continue;
593+
};
578594
let Some(DocumentLogEntry {
579595
ts: prev_rev_ts,
580596
value: maybe_prev_rev,
581597
..
582-
}) = prev_revs.get(&(id, ts))
598+
}) = prev_revs.get(&DocumentPrevTsQuery { id, ts, prev_ts })
583599
else {
600+
// This is unexpected: if there is a prev_ts, there should be a prev_rev.
601+
report_error(&mut anyhow::anyhow!(
602+
"Skipping deleting indexes for {id}@{ts}. It has a prev_ts of \
603+
{prev_ts} but no previous revision."
604+
))
605+
.await;
584606
log_retention_scanned_document(maybe_doc.is_none(), false);
585607
continue;
586608
};

crates/mysql/src/chunks.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use common::{
88
MYSQL_MAX_CHUNK_BYTES,
99
MYSQL_MAX_DYNAMIC_SMART_CHUNK_SIZE,
1010
},
11-
persistence::DocumentLogEntry,
11+
persistence::{
12+
DocumentLogEntry,
13+
DocumentPrevTsQuery,
14+
},
1215
types::{
1316
DatabaseIndexUpdate,
1417
Timestamp,
@@ -35,6 +38,12 @@ impl ApproxSize for Timestamp {
3538
}
3639
}
3740

41+
impl ApproxSize for DocumentPrevTsQuery {
42+
fn approx_size(&self) -> usize {
43+
self.id.approx_size() + self.ts.approx_size() + self.prev_ts.approx_size()
44+
}
45+
}
46+
3847
impl ApproxSize for InternalDocumentId {
3948
fn approx_size(&self) -> usize {
4049
self.size()

0 commit comments

Comments
 (0)