1
1
#pragma once
2
2
3
3
#include " flat_part_iface.h"
4
- #include " flat_part_forward.h"
5
4
#include " flat_fwd_iface.h"
6
5
#include " flat_fwd_misc.h"
7
6
#include " flat_fwd_page.h"
7
+ #include " flat_part_index_iter.h"
8
+ #include " flat_part_index_iter_iface.h"
9
+ #include " flat_table_part.h"
10
+ #include " flat_part_slice.h"
8
11
9
12
namespace NKikimr {
10
13
namespace NTable {
@@ -58,8 +61,16 @@ namespace NFwd {
58
61
TCache () = delete ;
59
62
60
63
TCache (const TPart* part, IPages* env, TGroupId groupId, const TIntrusiveConstPtr<TSlices>& bounds = nullptr )
61
- : Index(part, env, groupId, 1 , bounds)
62
- { }
64
+ : Index(MakeHolder<TPartIndexIt>(part, env, groupId)) // TODO: use CreateIndexIter(part, env, groupId)
65
+ {
66
+ if (bounds && !bounds->empty ()) {
67
+ BeginRowId = bounds->front ().BeginRowId ();
68
+ EndRowId = bounds->back ().EndRowId ();
69
+ } else {
70
+ BeginRowId = 0 ;
71
+ EndRowId = Index->GetEndRowId ();
72
+ }
73
+ }
63
74
64
75
~TCache ()
65
76
{
@@ -70,23 +81,35 @@ namespace NFwd {
70
81
71
82
TResult Handle (IPageLoadingQueue *head, TPageId pageId, ui64 lower) noexcept override
72
83
{
73
- Y_ABORT_UNLESS (pageId != Max<TPageId>(), " Invalid requested pageId " );
84
+ Y_ABORT_UNLESS (pageId != Max<TPageId>(), " Requested page is invalid " );
74
85
75
86
if (auto *page = Trace.Get (pageId)) {
76
87
return { page, false , true };
77
88
}
78
89
79
- Rewind (pageId); /* points Offset to pageId */
90
+ DropPagesBefore (pageId);
80
91
Shrink ();
81
92
82
- bool more = Grow && (OnHold + OnFetch <= lower);
93
+ bool grow = OnHold + OnFetch <= lower;
94
+
95
+ if (Offset == Pages.size ()) { // isn't processed yet
96
+ SyncIndex (pageId);
97
+ AddToQueue (head, pageId);
98
+ }
99
+
100
+ grow &= Index->IsValid () && Index->GetRowId () < EndRowId;
83
101
84
- return { Preload (head, 0 ).Touch (pageId, Stat), more , true };
102
+ return {Pages. at (Offset ).Touch (pageId, Stat), grow , true };
85
103
}
86
104
87
105
void Forward (IPageLoadingQueue *head, ui64 upper) noexcept override
88
106
{
89
- Preload (head, upper);
107
+ Y_ABORT_UNLESS (Started, " Couldn't be called before Handle returns grow" );
108
+
109
+ while (OnHold + OnFetch < upper && Index->IsValid () && Index->GetRowId () < EndRowId) {
110
+ AddToQueue (head, Index->GetPageId ());
111
+ Y_ABORT_UNLESS (Index->Next () != EReady::Page);
112
+ }
90
113
}
91
114
92
115
void Apply (TArrayRef<NPageCollection::TLoadedPage> loaded) noexcept override
@@ -117,35 +140,16 @@ namespace NFwd {
117
140
}
118
141
119
142
private:
120
- TPage& Preload (IPageLoadingQueue *head, ui64 upper) noexcept
121
- {
122
- auto until = [this , upper]() {
123
- return OnHold + OnFetch < upper ? Max<TPageId>() : 0 ;
124
- };
125
-
126
- while (auto more = Index.More (until ())) {
127
- auto size = head->AddToQueue (more, EPage::DataPage);
128
-
129
- Stat.Fetch += size;
130
- OnFetch += size;
131
-
132
- Pages.emplace_back (more, size, 0 , Max<TPageId>());
133
- Pages.back ().Fetch = EFetch::Wait;
134
- }
135
-
136
- Grow = Grow && Index.On (true ) < Max<TPageId>();
137
-
138
- return Pages.at (Offset);
139
- }
140
-
141
- void Rewind (TPageId pageId) noexcept
143
+ void DropPagesBefore (TPageId pageId) noexcept
142
144
{
143
- while (auto drop = Index. Clean (pageId )) {
145
+ while (Offset < Pages. size ( )) {
144
146
auto &page = Pages.at (Offset);
145
147
146
- if (!Pages || page.PageId != drop.PageId ) {
147
- Y_ABORT (" Dropping page that is not exist in cache" );
148
- } else if (page.Size == 0 ) {
148
+ if (page.PageId >= pageId) {
149
+ break ;
150
+ }
151
+
152
+ if (page.Size == 0 ) {
149
153
Y_ABORT (" Dropping page that has not been touched" );
150
154
} else if (page.Usage == EUsage::Keep && page) {
151
155
OnHold -= Trace.Emplace (page);
@@ -166,13 +170,42 @@ namespace NFwd {
166
170
}
167
171
}
168
172
173
+ void SyncIndex (TPageId pageId) noexcept
174
+ {
175
+ if (!Started) {
176
+ Y_ABORT_UNLESS (Index->Seek (BeginRowId) == EReady::Data);
177
+ Y_ABORT_UNLESS (Index->GetPageId () <= pageId, " Requested page is out of slice bounds" );
178
+ Started = true ;
179
+ }
180
+
181
+ while (Index->IsValid () && Index->GetPageId () < pageId) {
182
+ Y_ABORT_UNLESS (Index->Next () == EReady::Data);
183
+ Y_ABORT_UNLESS (Index->GetRowId () < EndRowId, " Requested page is out of slice bounds" );
184
+ }
185
+
186
+ Y_ABORT_UNLESS (Index->GetPageId () == pageId, " Requested page doesn't belong to the part" );
187
+ Y_ABORT_UNLESS (Index->Next () != EReady::Page);
188
+ }
189
+
190
+ void AddToQueue (IPageLoadingQueue *head, TPageId pageId) noexcept
191
+ {
192
+ auto size = head->AddToQueue (pageId, EPage::DataPage);
193
+
194
+ Stat.Fetch += size;
195
+ OnFetch += size;
196
+
197
+ Y_ABORT_UNLESS (!Pages || Pages.back ().PageId < pageId);
198
+ Pages.emplace_back (pageId, size, 0 , Max<TPageId>());
199
+ Pages.back ().Fetch = EFetch::Wait;
200
+ }
201
+
169
202
private:
170
- bool Grow = true ; /* Have some pages for Forward(...) */
171
- TForward Index;
203
+ THolder<IIndexIter> Index; /* Points on next to load page */
204
+ bool Started = false ;
205
+ TRowId BeginRowId, EndRowId;
172
206
TLoadedPagesCircularBuffer<TPart::Trace> Trace;
173
207
174
208
/* _ Forward cache line state */
175
-
176
209
ui64 OnHold = 0 ;
177
210
ui64 OnFetch = 0 ;
178
211
ui32 Offset = 0 ;
0 commit comments