1
+ use anyhow:: Context as _;
1
2
use futures:: {
2
3
Stream ,
3
4
TryStreamExt ,
@@ -6,11 +7,11 @@ use futures_async_stream::try_stream;
6
7
use value:: InternalDocumentId ;
7
8
8
9
use crate :: {
9
- comparators:: AsComparator ,
10
10
document:: ResolvedDocument ,
11
11
knobs:: DOCUMENTS_IN_MEMORY ,
12
12
persistence:: {
13
13
DocumentLogEntry ,
14
+ DocumentPrevTsQuery ,
14
15
RepeatablePersistence ,
15
16
} ,
16
17
try_chunks:: TryChunksExt ,
@@ -56,27 +57,37 @@ pub async fn stream_revision_pairs<'a>(
56
57
futures:: pin_mut!( documents) ;
57
58
58
59
while let Some ( read_chunk) = documents. try_next ( ) . await ? {
59
- // TODO: use prev_ts when it is available
60
- let ids = read_chunk
60
+ let queries = read_chunk
61
61
. iter ( )
62
- . map ( |entry| ( entry. id , entry. ts ) )
62
+ . filter_map ( |entry| {
63
+ entry. prev_ts . map ( |prev_ts| DocumentPrevTsQuery {
64
+ id : entry. id ,
65
+ ts : entry. ts ,
66
+ prev_ts,
67
+ } )
68
+ } )
63
69
. collect ( ) ;
64
- let mut prev_revs = reader. previous_revisions ( ids ) . await ?;
70
+ let mut prev_revs = reader. previous_revisions_of_documents ( queries ) . await ?;
65
71
for DocumentLogEntry {
66
72
ts,
73
+ prev_ts,
67
74
id,
68
75
value : document,
69
76
..
70
77
} in read_chunk
71
78
{
72
79
let rev = DocumentRevision { ts, document } ;
73
- let prev_rev =
74
- prev_revs
75
- . remove ( ( & id, & ts) . as_comparator ( ) )
76
- . map ( |entry| DocumentRevision {
80
+ let prev_rev = prev_ts
81
+ . map ( |prev_ts| {
82
+ let entry = prev_revs
83
+ . remove ( & DocumentPrevTsQuery { id, ts, prev_ts } )
84
+ . with_context ( || format ! ( "prev_ts is missing for {id}@{ts}: {prev_ts}" ) ) ?;
85
+ anyhow:: Ok ( DocumentRevision {
77
86
ts : entry. ts ,
78
87
document : entry. value ,
79
- } ) ;
88
+ } )
89
+ } )
90
+ . transpose ( ) ?;
80
91
yield RevisionPair { id, rev, prev_rev } ;
81
92
}
82
93
}
0 commit comments