@@ -10,52 +10,51 @@ namespace NKikimr {
10
10
namespace NTable {
11
11
namespace NFwd {
12
12
13
- class TCache : public IPageLoadingLogic {
14
- using TGroupId = NPage::TGroupId;
15
-
16
- template <size_t Items>
17
- struct TRound {
18
- const TSharedData* Get (TPageId pageId) const
19
- {
20
- if (pageId < Edge) {
21
- const auto pred = [pageId](const NPageCollection::TLoadedPage &page) {
22
- return page.PageId == pageId;
23
- };
24
-
25
- auto it = std::find_if (Pages.begin (), Pages.end (), pred);
26
-
27
- if (it == Pages.end ()) {
28
- Y_ABORT (" Failed to locate page within forward trace" );
13
+ template <size_t Capacity>
14
+ class TLoadedPagesCircularBuffer {
15
+ public:
16
+ const TSharedData* Get (TPageId pageId) const
17
+ {
18
+ if (pageId < FirstUnseenPageId) {
19
+ for (const auto & page : LoadedPages) {
20
+ if (page.PageId == pageId) {
21
+ return &page.Data ;
29
22
}
30
-
31
- return &it->Data ;
32
23
}
33
24
34
- return nullptr ;
25
+ Y_ABORT ( " Failed to locate page within forward trace " ) ;
35
26
}
36
27
37
- ui32 Emplace (TPage &page)
38
- {
39
- Y_ABORT_UNLESS (page, " Cannot push invalid page to trace cache" );
28
+ // next pages may be requested, ignore them
29
+ return nullptr ;
30
+ }
31
+
32
+ // returns released data size
33
+ ui64 Emplace (TPage &page)
34
+ {
35
+ Y_ABORT_UNLESS (page, " Cannot push invalid page to trace cache" );
40
36
41
- Offset = (Pages. size () + Offset - 1 ) % Pages. size () ;
37
+ Offset = (Offset + 1 ) % Capacity ;
42
38
43
- const ui32 was = Pages [Offset].Data .size ();
39
+ const ui64 releasedDataSize = LoadedPages [Offset].Data .size ();
44
40
45
- Pages [Offset].Data = page.Release ();
46
- Pages [Offset].PageId = page.PageId ;
47
- Edge = Max (Edge , page.PageId + 1 );
41
+ LoadedPages [Offset].Data = page.Release ();
42
+ LoadedPages [Offset].PageId = page.PageId ;
43
+ FirstUnseenPageId = Max (FirstUnseenPageId , page.PageId + 1 );
48
44
49
- return was ;
50
- }
45
+ return releasedDataSize ;
46
+ }
51
47
52
- private:
53
- std::array<NPageCollection::TLoadedPage, Items> Pages ;
54
- TPageId Edge = 0 ;
55
- ui32 Offset = 0 ;
56
- };
48
+ private:
49
+ std::array<NPageCollection::TLoadedPage, Capacity> LoadedPages ;
50
+ ui32 Offset = 0 ;
51
+ TPageId FirstUnseenPageId = 0 ;
52
+ };
57
53
54
+ class TCache : public IPageLoadingLogic {
58
55
public:
56
+ using TGroupId = NPage::TGroupId;
57
+
59
58
TCache () = delete ;
60
59
61
60
TCache (const TPart* part, IPages* env, TGroupId groupId, const TIntrusiveConstPtr<TSlices>& bounds = nullptr )
@@ -64,17 +63,21 @@ namespace NFwd {
64
63
65
64
~TCache ()
66
65
{
67
- for (auto &it: Pages) it.Release ();
66
+ for (auto &it: Pages) {
67
+ it.Release ();
68
+ }
68
69
}
69
70
70
71
TResult Handle (IPageLoadingQueue *head, TPageId pageId, ui64 lower) noexcept override
71
72
{
72
73
Y_ABORT_UNLESS (pageId != Max<TPageId>(), " Invalid requested pageId" );
73
74
74
- if (auto *page = Trace.Get (pageId))
75
+ if (auto *page = Trace.Get (pageId)) {
75
76
return { page, false , true };
77
+ }
76
78
77
- Rewind (pageId).Shrink (); /* points Offset to pageId */
79
+ Rewind (pageId); /* points Offset to pageId */
80
+ Shrink ();
78
81
79
82
bool more = Grow && (OnHold + OnFetch <= lower);
80
83
@@ -105,7 +108,7 @@ namespace NFwd {
105
108
106
109
Stat.Saved += one.Data .size ();
107
110
OnFetch -= one.Data .size ();
108
- OnHold += it->Settle (one);
111
+ OnHold += it->Settle (one); // settle of a dropped page returns 0 and releases its data
109
112
110
113
++it;
111
114
}
@@ -135,7 +138,7 @@ namespace NFwd {
135
138
return Pages.at (Offset);
136
139
}
137
140
138
- TCache& Rewind (TPageId pageId) noexcept
141
+ void Rewind (TPageId pageId) noexcept
139
142
{
140
143
while (auto drop = Index.Clean (pageId)) {
141
144
auto &page = Pages.at (Offset);
@@ -144,32 +147,29 @@ namespace NFwd {
144
147
Y_ABORT (" Dropping page that is not exist in cache" );
145
148
} else if (page.Size == 0 ) {
146
149
Y_ABORT (" Dropping page that has not been touched" );
147
- } else if (page.Usage == EUsage::Keep) {
150
+ } else if (page.Usage == EUsage::Keep && page ) {
148
151
OnHold -= Trace.Emplace (page);
149
- } else if (auto size = page.Release ().size ()) {
150
- OnHold -= size;
151
-
152
- *(page.Ready () ? &Stat.After : &Stat.Before ) += size;
152
+ } else {
153
+ OnHold -= page.Release ().size ();
154
+ *(page.Ready () ? &Stat.After : &Stat.Before ) += page.Size ;
153
155
}
154
156
157
+ // keep pending pages but increment offset
155
158
Offset++;
156
159
}
157
-
158
- return *this ;
159
160
}
160
161
161
- TCache& Shrink () noexcept
162
+ void Shrink () noexcept
162
163
{
163
- for (; Offset && Pages[0 ].Ready (); Offset--)
164
+ for (; Offset && Pages[0 ].Ready (); Offset--) {
164
165
Pages.pop_front ();
165
-
166
- return *this ;
166
+ }
167
167
}
168
168
169
169
private:
170
170
bool Grow = true ; /* Have some pages for Forward(...) */
171
171
TForward Index;
172
- TRound <TPart::Trace> Trace;
172
+ TLoadedPagesCircularBuffer <TPart::Trace> Trace;
173
173
174
174
/* _ Forward cache line state */
175
175
0 commit comments