1
1
#pragma once
2
2
3
3
#include " flat_part_iface.h"
4
- #include " flat_part_forward.h"
5
4
#include " flat_fwd_iface.h"
6
5
#include " flat_fwd_misc.h"
7
6
#include " flat_fwd_page.h"
7
+ #include " flat_part_index_iter_iface.h"
8
+ #include " flat_table_part.h"
9
+ #include " flat_part_slice.h"
8
10
9
11
namespace NKikimr {
10
12
namespace NTable {
11
13
namespace NFwd {
12
14
13
- class TCache : public IPageLoadingLogic {
14
- using TGroupId = NPage::TGroupId;
15
-
16
- template <size_t Items>
17
- struct TRound {
18
- const TSharedData* Get (TPageId pageId) const
19
- {
20
- if (pageId < Edge) {
21
- const auto pred = [pageId](const NPageCollection::TLoadedPage &page) {
22
- return page.PageId == pageId;
23
- };
24
-
25
- auto it = std::find_if (Pages.begin (), Pages.end (), pred);
26
-
27
- if (it == Pages.end ()) {
28
- Y_ABORT (" Failed to locate page within forward trace" );
15
+ template <size_t Capacity>
16
+ class TLoadedPagesCircularBuffer {
17
+ public:
18
+ const TSharedData* Get (TPageId pageId) const
19
+ {
20
+ if (pageId < FirstUnseenPageId) {
21
+ for (const auto & page : LoadedPages) {
22
+ if (page.PageId == pageId) {
23
+ return &page.Data ;
29
24
}
30
-
31
- return &it->Data ;
32
25
}
33
26
34
- return nullptr ;
27
+ Y_ABORT ( " Failed to locate page within forward trace " ) ;
35
28
}
36
29
37
- ui32 Emplace (TPage &page)
38
- {
39
- Y_ABORT_UNLESS (page, " Cannot push invalid page to trace cache" );
30
+ // next pages may be requested, ignore them
31
+ return nullptr ;
32
+ }
33
+
34
+ // returns released data size
35
+ ui64 Emplace (TPage &page)
36
+ {
37
+ Y_ABORT_UNLESS (page, " Cannot push invalid page to trace cache" );
40
38
41
- Offset = (Pages. size () + Offset - 1 ) % Pages. size () ;
39
+ Offset = (Offset + 1 ) % Capacity ;
42
40
43
- const ui32 was = Pages [Offset].Data .size ();
41
+ const ui64 releasedDataSize = LoadedPages [Offset].Data .size ();
44
42
45
- Pages [Offset].Data = page.Release ();
46
- Pages [Offset].PageId = page.PageId ;
47
- Edge = Max (Edge , page.PageId + 1 );
43
+ LoadedPages [Offset].Data = page.Release ();
44
+ LoadedPages [Offset].PageId = page.PageId ;
45
+ FirstUnseenPageId = Max (FirstUnseenPageId , page.PageId + 1 );
48
46
49
- return was ;
50
- }
47
+ return releasedDataSize ;
48
+ }
51
49
52
- private:
53
- std::array<NPageCollection::TLoadedPage, Items> Pages ;
54
- TPageId Edge = 0 ;
55
- ui32 Offset = 0 ;
56
- };
50
+ private:
51
+ std::array<NPageCollection::TLoadedPage, Capacity> LoadedPages ;
52
+ ui32 Offset = 0 ;
53
+ TPageId FirstUnseenPageId = 0 ;
54
+ };
57
55
56
+ class TCache : public IPageLoadingLogic {
58
57
public:
58
+ using TGroupId = NPage::TGroupId;
59
+
59
60
TCache () = delete ;
60
61
61
62
TCache (const TPart* part, IPages* env, TGroupId groupId, const TIntrusiveConstPtr<TSlices>& bounds = nullptr )
62
- : Index(part, env, groupId, 1 , bounds)
63
- { }
63
+ : Index(CreateIndexIter(part, env, groupId))
64
+ {
65
+ if (bounds && !bounds->empty ()) {
66
+ BeginRowId = bounds->front ().BeginRowId ();
67
+ EndRowId = bounds->back ().EndRowId ();
68
+ } else {
69
+ BeginRowId = 0 ;
70
+ EndRowId = Index->GetEndRowId ();
71
+ }
72
+ }
64
73
65
74
~TCache ()
66
75
{
67
- for (auto &it: Pages) it.Release ();
76
+ for (auto &it: Pages) {
77
+ it.Release ();
78
+ }
68
79
}
69
80
70
81
TResult Handle (IPageLoadingQueue *head, TPageId pageId, ui64 lower) noexcept override
71
82
{
72
- Y_ABORT_UNLESS (pageId != Max<TPageId>(), " Invalid requested pageId " );
83
+ Y_ABORT_UNLESS (pageId != Max<TPageId>(), " Requested page is invalid " );
73
84
74
- if (auto *page = Trace.Get (pageId))
85
+ if (auto *page = Trace.Get (pageId)) {
75
86
return { page, false , true };
87
+ }
76
88
77
- Rewind (pageId).Shrink (); /* points Offset to pageId */
89
+ DropPagesBefore (pageId);
90
+ Shrink ();
91
+
92
+ bool grow = OnHold + OnFetch <= lower;
78
93
79
- bool more = Grow && (OnHold + OnFetch <= lower);
94
+ if (Offset == Pages.size ()) { // isn't processed yet
95
+ SyncIndex (pageId);
96
+ AddToQueue (head, pageId);
97
+ }
80
98
81
- return { Preload (head, 0 ).Touch (pageId, Stat), more, true };
99
+ grow &= Index->IsValid () && Index->GetRowId () < EndRowId;
100
+
101
+ return {Pages.at (Offset).Touch (pageId, Stat), grow, true };
82
102
}
83
103
84
104
void Forward (IPageLoadingQueue *head, ui64 upper) noexcept override
85
105
{
86
- Preload (head, upper);
106
+ Y_ABORT_UNLESS (Started, " Couldn't be called before Handle returns grow" );
107
+
108
+ while (OnHold + OnFetch < upper && Index->IsValid () && Index->GetRowId () < EndRowId) {
109
+ AddToQueue (head, Index->GetPageId ());
110
+ Y_ABORT_UNLESS (Index->Next () != EReady::Page);
111
+ }
87
112
}
88
113
89
114
void Apply (TArrayRef<NPageCollection::TLoadedPage> loaded) noexcept override
@@ -105,7 +130,7 @@ namespace NFwd {
105
130
106
131
Stat.Saved += one.Data .size ();
107
132
OnFetch -= one.Data .size ();
108
- OnHold += it->Settle (one);
133
+ OnHold += it->Settle (one); // settle of a dropped page returns 0 and releases its data
109
134
110
135
++it;
111
136
}
@@ -114,65 +139,72 @@ namespace NFwd {
114
139
}
115
140
116
141
private:
117
- TPage& Preload (IPageLoadingQueue *head, ui64 upper ) noexcept
142
+ void DropPagesBefore (TPageId pageId ) noexcept
118
143
{
119
- auto until = [this , upper]() {
120
- return OnHold + OnFetch < upper ? Max<TPageId>() : 0 ;
121
- };
144
+ while (Offset < Pages.size ()) {
145
+ auto &page = Pages.at (Offset);
122
146
123
- while (auto more = Index.More (until ())) {
124
- auto size = head->AddToQueue (more, EPage::DataPage);
147
+ if (page.PageId >= pageId) {
148
+ break ;
149
+ }
125
150
126
- Stat.Fetch += size;
127
- OnFetch += size;
151
+ if (page.Size == 0 ) {
152
+ Y_ABORT (" Dropping page that has not been touched" );
153
+ } else if (page.Usage == EUsage::Keep && page) {
154
+ OnHold -= Trace.Emplace (page);
155
+ } else {
156
+ OnHold -= page.Release ().size ();
157
+ *(page.Ready () ? &Stat.After : &Stat.Before ) += page.Size ;
158
+ }
128
159
129
- Pages. emplace_back (more, size, 0 , Max<TPageId>());
130
- Pages. back (). Fetch = EFetch::Wait ;
160
+ // keep pending pages but increment offset
161
+ Offset++ ;
131
162
}
132
-
133
- Grow = Grow && Index.On (true ) < Max<TPageId>();
134
-
135
- return Pages.at (Offset);
136
163
}
137
164
138
- TCache& Rewind (TPageId pageId ) noexcept
165
+ void Shrink ( ) noexcept
139
166
{
140
- while (auto drop = Index.Clean (pageId)) {
141
- auto &page = Pages.at (Offset);
142
-
143
- if (!Pages || page.PageId != drop.PageId ) {
144
- Y_ABORT (" Dropping page that is not exist in cache" );
145
- } else if (page.Size == 0 ) {
146
- Y_ABORT (" Dropping page that has not been touched" );
147
- } else if (page.Usage == EUsage::Keep) {
148
- OnHold -= Trace.Emplace (page);
149
- } else if (auto size = page.Release ().size ()) {
150
- OnHold -= size;
167
+ for (; Offset && Pages[0 ].Ready (); Offset--) {
168
+ Pages.pop_front ();
169
+ }
170
+ }
151
171
152
- *(page.Ready () ? &Stat.After : &Stat.Before ) += size;
153
- }
172
+ void SyncIndex (TPageId pageId) noexcept
173
+ {
174
+ if (!Started) {
175
+ Y_ABORT_UNLESS (Index->Seek (BeginRowId) == EReady::Data);
176
+ Y_ABORT_UNLESS (Index->GetPageId () <= pageId, " Requested page is out of slice bounds" );
177
+ Started = true ;
178
+ }
154
179
155
- Offset++;
180
+ while (Index->IsValid () && Index->GetPageId () < pageId) {
181
+ Y_ABORT_UNLESS (Index->Next () == EReady::Data);
182
+ Y_ABORT_UNLESS (Index->GetRowId () < EndRowId, " Requested page is out of slice bounds" );
156
183
}
157
184
158
- return *this ;
185
+ Y_ABORT_UNLESS (Index->GetPageId () == pageId, " Requested page doesn't belong to the part" );
186
+ Y_ABORT_UNLESS (Index->Next () != EReady::Page);
159
187
}
160
188
161
- TCache& Shrink ( ) noexcept
189
+ void AddToQueue (IPageLoadingQueue *head, TPageId pageId ) noexcept
162
190
{
163
- for (; Offset && Pages[0 ].Ready (); Offset--)
164
- Pages.pop_front ();
191
+ auto size = head->AddToQueue (pageId, EPage::DataPage);
192
+
193
+ Stat.Fetch += size;
194
+ OnFetch += size;
165
195
166
- return *this ;
196
+ Y_ABORT_UNLESS (!Pages || Pages.back ().PageId < pageId);
197
+ Pages.emplace_back (pageId, size, 0 , Max<TPageId>());
198
+ Pages.back ().Fetch = EFetch::Wait;
167
199
}
168
200
169
201
private:
170
- bool Grow = true ; /* Have some pages for Forward(...) */
171
- TForward Index;
172
- TRound<TPart::Trace> Trace;
202
+ THolder<IIndexIter> Index; /* Points on next to load page */
203
+ bool Started = false ;
204
+ TRowId BeginRowId, EndRowId;
205
+ TLoadedPagesCircularBuffer<TPart::Trace> Trace;
173
206
174
207
/* _ Forward cache line state */
175
-
176
208
ui64 OnHold = 0 ;
177
209
ui64 OnFetch = 0 ;
178
210
ui32 Offset = 0 ;
0 commit comments