1
1
#pragma once
2
2
3
3
#include " flat_part_iface.h"
4
- #include " flat_part_forward.h"
5
4
#include " flat_fwd_iface.h"
6
5
#include " flat_fwd_misc.h"
7
6
#include " flat_fwd_page.h"
7
+ #include " flat_part_index_iter.h"
8
+ #include " flat_part_index_iter_iface.h"
9
+ #include " flat_table_part.h"
10
+ #include " flat_part_slice.h"
8
11
9
12
namespace NKikimr {
10
13
namespace NTable {
11
14
namespace NFwd {
12
15
13
- class TCache : public IPageLoadingLogic {
14
- using TGroupId = NPage::TGroupId;
15
-
16
- template <size_t Items>
17
- struct TRound {
18
- const TSharedData* Get (TPageId pageId) const
19
- {
20
- if (pageId < Edge) {
21
- const auto pred = [pageId](const NPageCollection::TLoadedPage &page) {
22
- return page.PageId == pageId;
23
- };
24
-
25
- auto it = std::find_if (Pages.begin (), Pages.end (), pred);
26
-
27
- if (it == Pages.end ()) {
28
- Y_ABORT (" Failed to locate page within forward trace" );
16
+ template <size_t Capacity>
17
+ class TLoadedPagesCircularBuffer {
18
+ public:
19
+ const TSharedData* Get (TPageId pageId) const
20
+ {
21
+ if (pageId < FirstUnseenPageId) {
22
+ for (const auto & page : LoadedPages) {
23
+ if (page.PageId == pageId) {
24
+ return &page.Data ;
29
25
}
30
-
31
- return &it->Data ;
32
26
}
33
27
34
- return nullptr ;
28
+ Y_ABORT ( " Failed to locate page within forward trace " ) ;
35
29
}
36
30
37
- ui32 Emplace (TPage &page)
38
- {
39
- Y_ABORT_UNLESS (page, " Cannot push invalid page to trace cache" );
31
+ // next pages may be requested, ignore them
32
+ return nullptr ;
33
+ }
34
+
35
+ // returns released data size
36
+ ui64 Emplace (TPage &page)
37
+ {
38
+ Y_ABORT_UNLESS (page, " Cannot push invalid page to trace cache" );
40
39
41
- Offset = (Pages. size () + Offset - 1 ) % Pages. size () ;
40
+ Offset = (Offset + 1 ) % Capacity ;
42
41
43
- const ui32 was = Pages [Offset].Data .size ();
42
+ const ui64 releasedDataSize = LoadedPages [Offset].Data .size ();
44
43
45
- Pages [Offset].Data = page.Release ();
46
- Pages [Offset].PageId = page.PageId ;
47
- Edge = Max (Edge , page.PageId + 1 );
44
+ LoadedPages [Offset].Data = page.Release ();
45
+ LoadedPages [Offset].PageId = page.PageId ;
46
+ FirstUnseenPageId = Max (FirstUnseenPageId , page.PageId + 1 );
48
47
49
- return was ;
50
- }
48
+ return releasedDataSize ;
49
+ }
51
50
52
- private:
53
- std::array<NPageCollection::TLoadedPage, Items> Pages ;
54
- TPageId Edge = 0 ;
55
- ui32 Offset = 0 ;
56
- };
51
+ private:
52
+ std::array<NPageCollection::TLoadedPage, Capacity> LoadedPages ;
53
+ ui32 Offset = 0 ;
54
+ TPageId FirstUnseenPageId = 0 ;
55
+ };
57
56
57
+ class TCache : public IPageLoadingLogic {
58
58
public:
59
+ using TGroupId = NPage::TGroupId;
60
+
59
61
TCache () = delete ;
60
62
61
63
TCache (const TPart* part, IPages* env, TGroupId groupId, const TIntrusiveConstPtr<TSlices>& bounds = nullptr )
62
- : Index(part, env, groupId, 1 , bounds)
63
- { }
64
+ : Index(MakeHolder<TPartIndexIt>(part, env, groupId)) // TODO: use CreateIndexIter(part, env, groupId)
65
+ {
66
+ if (bounds && !bounds->empty ()) {
67
+ BeginRowId = bounds->front ().BeginRowId ();
68
+ EndRowId = bounds->back ().EndRowId ();
69
+ } else {
70
+ BeginRowId = 0 ;
71
+ EndRowId = Index->GetEndRowId ();
72
+ }
73
+ }
64
74
65
75
~TCache ()
66
76
{
67
- for (auto &it: Pages) it.Release ();
77
+ for (auto &it: Pages) {
78
+ it.Release ();
79
+ }
68
80
}
69
81
70
82
TResult Handle (IPageLoadingQueue *head, TPageId pageId, ui64 lower) noexcept override
71
83
{
72
- Y_ABORT_UNLESS (pageId != Max<TPageId>(), " Invalid requested pageId " );
84
+ Y_ABORT_UNLESS (pageId != Max<TPageId>(), " Requested page is invalid " );
73
85
74
- if (auto *page = Trace.Get (pageId))
86
+ if (auto *page = Trace.Get (pageId)) {
75
87
return { page, false , true };
88
+ }
76
89
77
- Rewind (pageId).Shrink (); /* points Offset to pageId */
90
+ DropPagesBefore (pageId);
91
+ Shrink ();
92
+
93
+ bool grow = OnHold + OnFetch <= lower;
78
94
79
- bool more = Grow && (OnHold + OnFetch <= lower);
95
+ if (Offset == Pages.size ()) { // isn't processed yet
96
+ SyncIndex (pageId);
97
+ AddToQueue (head, pageId);
98
+ }
80
99
81
- return { Preload (head, 0 ).Touch (pageId, Stat), more, true };
100
+ grow &= Index->IsValid () && Index->GetRowId () < EndRowId;
101
+
102
+ return {Pages.at (Offset).Touch (pageId, Stat), grow, true };
82
103
}
83
104
84
105
void Forward (IPageLoadingQueue *head, ui64 upper) noexcept override
85
106
{
86
- Preload (head, upper);
107
+ Y_ABORT_UNLESS (Started, " Couldn't be called before Handle returns grow" );
108
+
109
+ while (OnHold + OnFetch < upper && Index->IsValid () && Index->GetRowId () < EndRowId) {
110
+ AddToQueue (head, Index->GetPageId ());
111
+ Y_ABORT_UNLESS (Index->Next () != EReady::Page);
112
+ }
87
113
}
88
114
89
115
void Apply (TArrayRef<NPageCollection::TLoadedPage> loaded) noexcept override
@@ -105,7 +131,7 @@ namespace NFwd {
105
131
106
132
Stat.Saved += one.Data .size ();
107
133
OnFetch -= one.Data .size ();
108
- OnHold += it->Settle (one);
134
+ OnHold += it->Settle (one); // settle of a dropped page returns 0 and releases its data
109
135
110
136
++it;
111
137
}
@@ -114,65 +140,72 @@ namespace NFwd {
114
140
}
115
141
116
142
private:
117
- TPage& Preload (IPageLoadingQueue *head, ui64 upper ) noexcept
143
+ void DropPagesBefore (TPageId pageId ) noexcept
118
144
{
119
- auto until = [this , upper]() {
120
- return OnHold + OnFetch < upper ? Max<TPageId>() : 0 ;
121
- };
145
+ while (Offset < Pages.size ()) {
146
+ auto &page = Pages.at (Offset);
122
147
123
- while (auto more = Index.More (until ())) {
124
- auto size = head->AddToQueue (more, EPage::DataPage);
148
+ if (page.PageId >= pageId) {
149
+ break ;
150
+ }
125
151
126
- Stat.Fetch += size;
127
- OnFetch += size;
152
+ if (page.Size == 0 ) {
153
+ Y_ABORT (" Dropping page that has not been touched" );
154
+ } else if (page.Usage == EUsage::Keep && page) {
155
+ OnHold -= Trace.Emplace (page);
156
+ } else {
157
+ OnHold -= page.Release ().size ();
158
+ *(page.Ready () ? &Stat.After : &Stat.Before ) += page.Size ;
159
+ }
128
160
129
- Pages. emplace_back (more, size, 0 , Max<TPageId>());
130
- Pages. back (). Fetch = EFetch::Wait ;
161
+ // keep pending pages but increment offset
162
+ Offset++ ;
131
163
}
132
-
133
- Grow = Grow && Index.On (true ) < Max<TPageId>();
134
-
135
- return Pages.at (Offset);
136
164
}
137
165
138
- TCache& Rewind (TPageId pageId ) noexcept
166
+ void Shrink ( ) noexcept
139
167
{
140
- while (auto drop = Index.Clean (pageId)) {
141
- auto &page = Pages.at (Offset);
142
-
143
- if (!Pages || page.PageId != drop.PageId ) {
144
- Y_ABORT (" Dropping page that is not exist in cache" );
145
- } else if (page.Size == 0 ) {
146
- Y_ABORT (" Dropping page that has not been touched" );
147
- } else if (page.Usage == EUsage::Keep) {
148
- OnHold -= Trace.Emplace (page);
149
- } else if (auto size = page.Release ().size ()) {
150
- OnHold -= size;
168
+ for (; Offset && Pages[0 ].Ready (); Offset--) {
169
+ Pages.pop_front ();
170
+ }
171
+ }
151
172
152
- *(page.Ready () ? &Stat.After : &Stat.Before ) += size;
153
- }
173
+ void SyncIndex (TPageId pageId) noexcept
174
+ {
175
+ if (!Started) {
176
+ Y_ABORT_UNLESS (Index->Seek (BeginRowId) == EReady::Data);
177
+ Y_ABORT_UNLESS (Index->GetPageId () <= pageId, " Requested page is out of slice bounds" );
178
+ Started = true ;
179
+ }
154
180
155
- Offset++;
181
+ while (Index->IsValid () && Index->GetPageId () < pageId) {
182
+ Y_ABORT_UNLESS (Index->Next () == EReady::Data);
183
+ Y_ABORT_UNLESS (Index->GetRowId () < EndRowId, " Requested page is out of slice bounds" );
156
184
}
157
185
158
- return *this ;
186
+ Y_ABORT_UNLESS (Index->GetPageId () == pageId, " Requested page doesn't belong to the part" );
187
+ Y_ABORT_UNLESS (Index->Next () != EReady::Page);
159
188
}
160
189
161
- TCache& Shrink ( ) noexcept
190
+ void AddToQueue (IPageLoadingQueue *head, TPageId pageId ) noexcept
162
191
{
163
- for (; Offset && Pages[0 ].Ready (); Offset--)
164
- Pages.pop_front ();
192
+ auto size = head->AddToQueue (pageId, EPage::DataPage);
193
+
194
+ Stat.Fetch += size;
195
+ OnFetch += size;
165
196
166
- return *this ;
197
+ Y_ABORT_UNLESS (!Pages || Pages.back ().PageId < pageId);
198
+ Pages.emplace_back (pageId, size, 0 , Max<TPageId>());
199
+ Pages.back ().Fetch = EFetch::Wait;
167
200
}
168
201
169
202
private:
170
- bool Grow = true ; /* Have some pages for Forward(...) */
171
- TForward Index;
172
- TRound<TPart::Trace> Trace;
203
+ THolder<IIndexIter> Index; /* Points on next to load page */
204
+ bool Started = false ;
205
+ TRowId BeginRowId, EndRowId;
206
+ TLoadedPagesCircularBuffer<TPart::Trace> Trace;
173
207
174
208
/* _ Forward cache line state */
175
-
176
209
ui64 OnHold = 0 ;
177
210
ui64 OnFetch = 0 ;
178
211
ui32 Offset = 0 ;
0 commit comments