Skip to content

Commit aab08a6

Browse files
committed
KIKIMR-19139 Get index pages from Env
1 parent 9cf080f commit aab08a6

15 files changed

+266
-101
lines changed

ydb/core/tablet_flat/flat_fwd_blobs.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ namespace NFwd {
149149
if (!Tags.at(page.Tag) || page.Size >= Edge.at(page.Tag) || !Filter.Has(rel.Row)) {
150150
/* Page doesn't fits to load criteria */
151151
} else if (page.Fetch == EFetch::None) {
152-
auto size = head->AddToQueue(Grow, ui16(EPage::Opaque));
152+
auto size = head->AddToQueue(Grow, EPage::Opaque);
153153

154154
Y_VERIFY(size == page.Size, "Inconsistent page sizez");
155155

ydb/core/tablet_flat/flat_fwd_cache.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ namespace NFwd {
120120
};
121121

122122
while (auto more = Index.More(until())) {
123-
auto size = head->AddToQueue(more, ui16(EPage::DataPage));
123+
auto size = head->AddToQueue(more, EPage::DataPage);
124124

125125
Stat.Fetch += size;
126126
OnFetch += size;

ydb/core/tablet_flat/flat_fwd_env.h

+53-9
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace NFwd {
4040
return PageLoadingLogic.Get();
4141
}
4242

43-
ui64 AddToQueue(TPageId pageId, ui16 type) noexcept override
43+
ui64 AddToQueue(TPageId pageId, EPage type) noexcept override
4444
{
4545
if (!Fetch) {
4646
Fetch.Reset(new TFetch(Cookie, PageCollection, { }));
@@ -49,7 +49,7 @@ namespace NFwd {
4949

5050
const auto meta = PageCollection->Page(pageId);
5151

52-
if (meta.Type != type || meta.Size == 0)
52+
if (meta.Type != ui16(type) || meta.Size == 0)
5353
Y_FAIL("Got a non-data page while part index traverse");
5454

5555
Fetch->Pages.emplace_back(pageId);
@@ -75,9 +75,11 @@ namespace NFwd {
7575
TIntrusiveConstPtr<IPageCollection> PageCollection;
7676
};
7777

78-
public:
79-
using TData = const TSharedData*;
78+
struct TSimpleEnv {
79+
TMap<TPageId, NPageCollection::TLoadedPage> Pages;
80+
};
8081

82+
public:
8183
TEnv(const TConf &conf, const TSubset &subset)
8284
: Salt(RandomNumber<ui32>())
8385
, Conf(conf)
@@ -106,10 +108,16 @@ namespace NFwd {
106108
return Pending == 0;
107109
}
108110

109-
TData TryGetPage(const TPart* part, TPageId ref, TGroupId groupId) override
111+
const TSharedData* TryGetPage(const TPart* part, TPageId ref, TGroupId groupId) override
110112
{
111113
ui16 room = (groupId.Historic ? part->Groups + 2 : 0) + groupId.Index;
112-
return Handle(GetQueue(part, room), ref).Page;
114+
TSlot slot = GetQueueSlot(part, room);
115+
116+
if (part->IndexPages.Has(groupId, ref)) {
117+
return TryGetIndexPage(slot, ref);
118+
}
119+
120+
return Handle(Queues.at(slot), ref).Page;
113121
}
114122

115123
TResult Locate(const TMemTable *memTable, ui64 ref, ui32 tag) noexcept override
@@ -150,7 +158,18 @@ namespace NFwd {
150158
"Page fwd cache got more pages than was requested");
151159

152160
Pending -= pages.size();
153-
Queues.at(part)->Apply(pages);
161+
162+
TVector<NPageCollection::TLoadedPage> queuePages(Reserve(pages.size()));
163+
for (auto& page : pages) {
164+
const auto &meta = pageCollection->Page(page.PageId);
165+
if (NTable::EPage(meta.Type) == EPage::Index) {
166+
IndexPages.at(part).Pages[page.PageId] = page;
167+
} else {
168+
queuePages.push_back(page);
169+
}
170+
}
171+
172+
Queues.at(part)->Apply(queuePages);
154173
}
155174

156175
IPages* Reset() noexcept
@@ -168,6 +187,7 @@ namespace NFwd {
168187
Total = Stats();
169188
Parts.clear();
170189
Queues.clear();
190+
IndexPages.clear();
171191
ColdParts.clear();
172192

173193
for (auto &one : Subset.Flatten)
@@ -239,6 +259,23 @@ namespace NFwd {
239259
}
240260

241261
private:
262+
const TSharedData* TryGetIndexPage(TSlot slot, TPageId pageId) noexcept
263+
{
264+
// TODO: count index pages in Stats later
265+
266+
auto &env = IndexPages.at(slot);
267+
auto pageIt = env.Pages.find(pageId);
268+
269+
if (pageIt != env.Pages.end()) {
270+
return &pageIt->second.Data;
271+
} else {
272+
auto &queue = Queues.at(slot);
273+
queue.AddToQueue(pageId, EPage::Index);
274+
Queue.PushBack(&queue);
275+
return nullptr;
276+
}
277+
}
278+
242279
TResult Handle(TPageLoadingQueue &q, TPageId ref) noexcept
243280
{
244281
auto got = q->Handle(&q, ref, Conf.AheadLo);
@@ -277,7 +314,7 @@ namespace NFwd {
277314
"Cannot handle multiple parts on the same page collection");
278315
}
279316

280-
TPageLoadingQueue& GetQueue(const TPart *part, ui16 room) noexcept
317+
TSlot GetQueueSlot(const TPart *part, ui16 room) noexcept
281318
{
282319
auto it = Parts.find(part);
283320
Y_VERIFY(it != Parts.end(),
@@ -289,7 +326,12 @@ namespace NFwd {
289326
Y_VERIFY(Queues.at(it->second[0]).PageCollection->Label() == part->Label,
290327
"Cannot handle multiple parts on the same page collection");
291328

292-
return Queues.at(it->second[room]);
329+
return it->second[room];
330+
}
331+
332+
TPageLoadingQueue& GetQueue(const TPart *part, ui16 room) noexcept
333+
{
334+
return Queues.at(GetQueueSlot(part, room));
293335
}
294336

295337
TSlot Settle(TEgg egg, ui32 slot) noexcept
@@ -298,6 +340,7 @@ namespace NFwd {
298340
const ui64 cookie = (ui64(Queues.size()) << 32) | ui32(Salt + Epoch);
299341

300342
Queues.emplace_back(slot, cookie, std::move(egg.PageCollection), egg.PageLoadingLogic);
343+
IndexPages.emplace_back();
301344

302345
return Queues.size() - 1;
303346
} else {
@@ -373,6 +416,7 @@ namespace NFwd {
373416
const TVector<ui32> Keys; /* Tags to expand ELargeObj references */
374417

375418
TDeque<TPageLoadingQueue> Queues;
419+
TDeque<TSimpleEnv> IndexPages;
376420
THashMap<const TPart*, TSlotVec> Parts;
377421
THashSet<const TPart*> ColdParts;
378422
// Wrapper for memable blobs

ydb/core/tablet_flat/flat_fwd_iface.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
#pragma once
22

3+
#include "flat_page_iface.h"
34
#include "flat_sausage_fetch.h"
45
#include "flat_fwd_misc.h"
56

67
namespace NKikimr {
78
namespace NTable {
9+
using EPage = NPage::EPage;
10+
811
namespace NFwd {
912

1013
struct TPage;
@@ -13,7 +16,7 @@ namespace NFwd {
1316
public:
1417
virtual ~IPageLoadingQueue() = default;
1518

16-
virtual ui64 AddToQueue(ui32 page, ui16 type) noexcept = 0;
19+
virtual ui64 AddToQueue(ui32 page, EPage type) noexcept = 0;
1720
};
1821

1922
class IPageLoadingLogic {

ydb/core/tablet_flat/flat_part_index_iter.h

+10-4
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ class TPartIndexIt {
1515
using TIter = NPage::TIndex::TIter;
1616
using TGroupId = NPage::TGroupId;
1717

18-
TPartIndexIt(const TPart* part, TGroupId groupId)
18+
TPartIndexIt(const TPart* part, IPages* env, TGroupId groupId)
1919
: Part(part)
20+
, Env(env)
2021
, GroupId(groupId)
2122
, EndRowId(part->GetGroupIndex(groupId).GetEndRowId())
2223
{ }
@@ -112,13 +113,18 @@ class TPartIndexIt {
112113
if (Index) {
113114
return &*Index;
114115
}
115-
// TODO: get index from Env
116-
Index = Part->GetGroupIndex(GroupId);
117-
return &*Index;
116+
auto pageId = GroupId.IsHistoric() ? Part->IndexPages.Historic[GroupId.Index] : Part->IndexPages.Groups[GroupId.Index];
117+
auto page = Env->TryGetPage(Part, pageId);
118+
if (page) {
119+
Index = TIndex(*page);
120+
return &*Index;
121+
}
122+
return { };
118123
}
119124

120125
private:
121126
const TPart* const Part;
127+
IPages* const Env;
122128
const TGroupId GroupId;
123129
std::optional<TIndex> Index;
124130
TIter Iter;

0 commit comments

Comments
 (0)