@@ -943,7 +943,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
943
943
}
944
944
for entry in to_partition {
945
945
let mut hash = DefaultHasher :: new ( ) ;
946
- entry. 1 . hash ( & mut hash) ;
946
+ entry. 1 . 1 . hash ( & mut hash) ;
947
947
let i = ( hash. finish ( ) as usize ) % * RETENTION_DELETE_PARALLEL ;
948
948
parts[ i] . push ( entry) ;
949
949
}
@@ -1591,6 +1591,7 @@ fn snapshot_invalid_error(
1591
1591
mod tests {
1592
1592
use std:: {
1593
1593
collections:: BTreeSet ,
1594
+ env,
1594
1595
sync:: Arc ,
1595
1596
} ;
1596
1597
@@ -1631,6 +1632,7 @@ mod tests {
1631
1632
} ;
1632
1633
use errors:: ErrorMetadataAnyhowExt ;
1633
1634
use futures:: {
1635
+ future:: try_join_all,
1634
1636
pin_mut,
1635
1637
stream,
1636
1638
TryStreamExt ,
@@ -1905,4 +1907,99 @@ mod tests {
1905
1907
1906
1908
Ok ( ( ) )
1907
1909
}
1910
+
1911
+ #[ convex_macro:: test_runtime]
1912
+ async fn test_delete_document_chunk ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
1913
+ env:: set_var ( "DOCUMENT_RETENTION_DRY_RUN" , "false" ) ;
1914
+ env:: set_var ( "RETENTION_DELETE_PARALLEL" , "4" ) ;
1915
+ let p = Arc :: new ( TestPersistence :: new ( ) ) ;
1916
+ let mut id_generator = TestIdGenerator :: new ( ) ;
1917
+ let table: TableName = str:: parse ( "table" ) ?;
1918
+
1919
+ let id1 = id_generator. user_generate ( & table) ;
1920
+
1921
+ let documents = vec ! [
1922
+ doc( id1, 1 , Some ( 1 ) ) ?,
1923
+ doc( id1, 2 , Some ( 2 ) ) ?,
1924
+ doc( id1, 3 , Some ( 3 ) ) ?,
1925
+ doc( id1, 4 , Some ( 4 ) ) ?,
1926
+ doc( id1, 5 , Some ( 5 ) ) ?,
1927
+ doc( id1, 6 , Some ( 6 ) ) ?,
1928
+ doc( id1, 7 , Some ( 7 ) ) ?,
1929
+ doc( id1, 8 , Some ( 8 ) ) ?,
1930
+ doc( id1, 9 , Some ( 9 ) ) ?,
1931
+ doc( id1, 10 , Some ( 10 ) ) ?,
1932
+ // min_document_snapshot_ts: 11
1933
+ doc( id1, 12 , Some ( 12 ) ) ?,
1934
+ doc( id1, 13 , Some ( 13 ) ) ?,
1935
+ ] ;
1936
+
1937
+ p. clone ( )
1938
+ . write ( documents. clone ( ) , BTreeSet :: new ( ) , ConflictStrategy :: Error )
1939
+ . await ?;
1940
+
1941
+ let min_snapshot_ts = Timestamp :: must ( 11 ) ;
1942
+ // The max repeatable ts needs to be ahead of Timestamp::MIN by at least the
1943
+ // retention delay, so the anyhow::ensure before we delete doesn't fail
1944
+ let repeatable_ts =
1945
+ unchecked_repeatable_ts ( min_snapshot_ts. add ( * DOCUMENT_RETENTION_DELAY ) ?) ;
1946
+
1947
+ let retention_validator = Arc :: new ( NoopRetentionValidator ) ;
1948
+ let reader = p. reader ( ) ;
1949
+ let reader =
1950
+ RepeatablePersistence :: new ( reader. clone ( ) , repeatable_ts, retention_validator. clone ( ) ) ;
1951
+
1952
+ let scanned_stream = LeaderRetentionManager :: < TestRuntime > :: expired_documents (
1953
+ & rt,
1954
+ reader. clone ( ) ,
1955
+ Timestamp :: MIN ,
1956
+ min_snapshot_ts,
1957
+ ) ;
1958
+ let scanned: Vec < _ > = scanned_stream. try_collect ( ) . await ?;
1959
+ let expired: Vec < _ > = scanned
1960
+ . into_iter ( )
1961
+ . filter_map ( |doc| {
1962
+ if doc. 1 . is_some ( ) {
1963
+ Some ( ( doc. 0 , doc. 1 . unwrap ( ) ) )
1964
+ } else {
1965
+ None
1966
+ }
1967
+ } )
1968
+ . collect ( ) ;
1969
+
1970
+ assert_eq ! ( expired. len( ) , 9 ) ;
1971
+ let results = try_join_all (
1972
+ LeaderRetentionManager :: < TestRuntime > :: partition_document_chunk ( expired)
1973
+ . into_iter ( )
1974
+ . map ( |delete_chunk| {
1975
+ // Ensures that all documents with the same id are in the same chunk
1976
+ assert ! ( delete_chunk. is_empty( ) || delete_chunk. len( ) == 9 ) ;
1977
+ LeaderRetentionManager :: < TestRuntime > :: delete_document_chunk (
1978
+ delete_chunk,
1979
+ p. clone ( ) ,
1980
+ min_snapshot_ts,
1981
+ )
1982
+ } ) ,
1983
+ )
1984
+ . await ?;
1985
+ let ( _, deleted_rows) : ( Vec < _ > , Vec < _ > ) = results. into_iter ( ) . unzip ( ) ;
1986
+ let deleted_rows = deleted_rows. into_iter ( ) . sum :: < usize > ( ) ;
1987
+ assert_eq ! ( deleted_rows, 9 ) ;
1988
+
1989
+ let reader = p. reader ( ) ;
1990
+
1991
+ // All documents are still visible at snapshot ts=12.
1992
+ let stream = reader. load_all_documents ( ) ;
1993
+ let results: Vec < _ > = stream. try_collect :: < Vec < _ > > ( ) . await ?. into_iter ( ) . collect ( ) ;
1994
+ assert_eq ! (
1995
+ results,
1996
+ vec![
1997
+ doc( id1, 10 , Some ( 10 ) ) ?,
1998
+ doc( id1, 12 , Some ( 12 ) ) ?,
1999
+ doc( id1, 13 , Some ( 13 ) ) ?,
2000
+ ]
2001
+ ) ;
2002
+
2003
+ Ok ( ( ) )
2004
+ }
1908
2005
}
0 commit comments