Skip to content

Commit 2bf4932

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
validate previous_revisions timestamp is within document retention (#34238)
currently we're validating that the timestamp is within document retention, but only for timestamps that actually have a previous revision. Consider if you query at a too-old-timestamp, where it's very likely the previous revision has been deleted by document retention. Then we don't assert anything, but this is exactly the case where we want to throw an error. fix it by calculating the min_ts of all the input timestamps, not of all the ones that actually had results. GitOrigin-RevId: 562ee684de507dc13bcec95e20670cb116ff8d65
1 parent 5df8a55 commit 2bf4932

File tree

3 files changed

+6
-4
lines changed

3 files changed

+6
-4
lines changed

crates/mysql/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -991,13 +991,15 @@ impl<RT: Runtime> PersistenceReader for MySqlReader<RT> {
991991

992992
let mut results = vec![];
993993

994+
let mut min_ts = Timestamp::MAX;
994995
for chunk in smart_chunks(&ids) {
995996
let mut params = vec![];
996997
for (id, ts) in chunk {
997998
params.push(i64::from(*ts).into());
998999
params.push(internal_id_param(id.table().0).into());
9991000
params.push(internal_doc_id_param(*id).into());
10001001
params.push(i64::from(*ts).into());
1002+
min_ts = cmp::min(*ts, min_ts);
10011003
}
10021004
let result_stream = client
10031005
.query_stream(prev_rev_chunk(chunk.len()), params, chunk.len())
@@ -1007,12 +1009,10 @@ impl<RT: Runtime> PersistenceReader for MySqlReader<RT> {
10071009
results.push(result);
10081010
}
10091011
}
1010-
let mut min_ts = Timestamp::MAX;
10111012
for row in results.into_iter() {
10121013
let ts: i64 = row.get(6).unwrap();
10131014
let ts = Timestamp::try_from(ts)?;
10141015
let (prev_ts, id, maybe_doc, prev_prev_ts) = self.row_to_document(row)?;
1015-
min_ts = cmp::min(ts, min_ts);
10161016
anyhow::ensure!(result
10171017
.insert(
10181018
(id, ts),

crates/postgres/src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -928,13 +928,15 @@ impl PersistenceReader for PostgresReader {
928928
// the pipeline at once.
929929
let mut result_futures = vec![];
930930

931+
let mut min_ts = Timestamp::MAX;
931932
for chunk in chunks.by_ref() {
932933
assert_eq!(chunk.len(), 8);
933934
let mut params = Vec::with_capacity(16);
934935
for (id, ts) in chunk {
935936
params.push(Param::TableId(id.table()));
936937
params.push(internal_doc_id_param(*id));
937938
params.push(Param::Ts(i64::from(*ts)));
939+
min_ts = cmp::min(*ts, min_ts);
938940
}
939941
result_futures.push(client.query_raw(&prev_rev_chunk, params));
940942
}
@@ -944,10 +946,10 @@ impl PersistenceReader for PostgresReader {
944946
internal_doc_id_param(*id),
945947
Param::Ts(i64::from(*ts)),
946948
];
949+
min_ts = cmp::min(*ts, min_ts);
947950
result_futures.push(client.query_raw(&prev_rev, params));
948951
}
949952
let mut result_stream = stream::iter(result_futures).buffered(*PIPELINE_QUERIES);
950-
let mut min_ts = Timestamp::MAX;
951953
while let Some(row_stream) = result_stream.try_next().await? {
952954
futures::pin_mut!(row_stream);
953955
while let Some(row) = row_stream.try_next().await? {

crates/sqlite/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ impl PersistenceReader for SqlitePersistence {
493493
{
494494
let inner = self.inner.lock();
495495
for (id, ts) in ids {
496+
min_ts = cmp::min(ts, min_ts);
496497
let mut stmt = inner.connection.prepare(PREV_REV_QUERY)?;
497498
let internal_id = id.internal_id();
498499
let params = params![&id.table().0[..], &internal_id[..], &u64::from(ts)];
@@ -515,7 +516,6 @@ impl PersistenceReader for SqlitePersistence {
515516
None
516517
};
517518
let prev_prev_ts = prev_prev_ts.map(Timestamp::try_from).transpose()?;
518-
min_ts = cmp::min(ts, min_ts);
519519
out.insert(
520520
(document_id, ts),
521521
DocumentLogEntry {

0 commit comments

Comments
 (0)