Skip to content

Commit d6aa89c

Browse files
authored
Pass compacted pages directly to Shared Cache (#11779)
1 parent 3801e16 commit d6aa89c

15 files changed

+474
-194
lines changed

ydb/core/base/appdata.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include <ydb/core/control/immediate_control_board_impl.h>
1414
#include <ydb/core/grpc_services/grpc_helper.h>
15+
#include <ydb/core/tablet_flat/shared_cache_pages.h>
1516
#include <ydb/core/protos/auth.pb.h>
1617
#include <ydb/core/protos/bootstrap.pb.h>
1718
#include <ydb/core/protos/blobstorage.pb.h>
@@ -93,6 +94,7 @@ TAppData::TAppData(
9394
, Mon(nullptr)
9495
, Icb(new TControlBoard())
9596
, InFlightLimiterRegistry(new NGRpcService::TInFlightLimiterRegistry(Icb))
97+
, SharedCachePages(new NSharedCache::TSharedCachePages())
9698
, StreamingConfig(Impl->StreamingConfig)
9799
, PQConfig(Impl->PQConfig)
98100
, PQClusterDiscoveryConfig(Impl->PQClusterDiscoveryConfig)

ydb/core/base/appdata_fwd.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ namespace NKikimr {
1919
namespace NGRpcService {
2020
class TInFlightLimiterRegistry;
2121
}
22+
namespace NSharedCache {
23+
class TSharedCachePages;
24+
}
2225
}
2326

2427
namespace NKikimrCms {
@@ -189,6 +192,7 @@ struct TAppData {
189192
::NMonitoring::TDynamicCounterPtr Counters;
190193
TIntrusivePtr<NKikimr::TControlBoard> Icb;
191194
TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> InFlightLimiterRegistry;
195+
TIntrusivePtr<NSharedCache::TSharedCachePages> SharedCachePages;
192196

193197
TIntrusivePtr<NInterconnect::TPollerThreads> PollerThreads;
194198

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,9 @@ void TExecutor::RecreatePageCollectionsCache() noexcept
214214
for (const auto &it : Database->GetScheme().Tables) {
215215
auto subset = Database->Subset(it.first, NTable::TEpoch::Max(), { }, { });
216216

217-
for (auto &partView : subset->Flatten) AddCachesOfBundle(partView);
217+
for (auto &partView : subset->Flatten) {
218+
AddCachesOfBundle(partView);
219+
}
218220
}
219221

220222
if (TransactionWaitPads) {

ydb/core/tablet_flat/flat_ops_compact.h

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include "flat_comp.h"
1414
#include "flat_executor_misc.h"
1515
#include "flat_bio_stats.h"
16+
#include "shared_cache_pages.h"
17+
#include "shared_sausagecache.h"
1618
#include "util_channel.h"
1719

1820
#include <ydb/core/base/blobstorage.h>
@@ -102,6 +104,7 @@ namespace NTabletFlatExecutor {
102104

103105
Spent = new TSpent(TAppData::TimeProvider.Get());
104106
Registry = AppData()->TypeRegistry;
107+
SharedCachePages = AppData()->SharedCachePages.Get();
105108
Scheme = std::move(scheme);
106109
Driver = driver;
107110

@@ -308,11 +311,39 @@ namespace NTabletFlatExecutor {
308311
auto *prod = new TProdCompact(!fail, Mask.Step(), std::move(Conf->Params),
309312
std::move(YellowMoveChannels), std::move(YellowStopChannels));
310313

314+
if (fail) {
315+
Results.clear(); /* shouldn't sent w/o fixation in bs */
316+
}
317+
311318
for (auto &result : Results) {
312319
Y_ABORT_UNLESS(result.PageCollections, "Compaction produced a part without page collections");
320+
TVector<TIntrusivePtr<NTable::TLoader::TCache>> pageCollections;
321+
for (auto& pageCollection : result.PageCollections) {
322+
auto cache = MakeIntrusive<NTable::TLoader::TCache>(pageCollection.PageCollection);
323+
auto saveCompactedPages = MakeHolder<NSharedCache::TEvSaveCompactedPages>(pageCollection.PageCollection);
324+
auto gcList = SharedCachePages->GCList;
325+
auto addPage = [&saveCompactedPages, &pageCollection, &cache, &gcList](NPageCollection::TLoadedPage& loadedPage, bool sticky) {
326+
auto pageId = loadedPage.PageId;
327+
auto pageSize = pageCollection.PageCollection->Page(pageId).Size;
328+
auto sharedPage = MakeIntrusive<TPage>(pageId, pageSize, nullptr);
329+
sharedPage->Initialize(std::move(loadedPage.Data));
330+
saveCompactedPages->Pages.push_back(sharedPage);
331+
cache->Fill(pageId, TSharedPageRef::MakeUsed(std::move(sharedPage), gcList), sticky);
332+
};
333+
for (auto &page : pageCollection.StickyPages) {
334+
addPage(page, true);
335+
}
336+
for (auto &page : pageCollection.RegularPages) {
337+
addPage(page, false);
338+
}
339+
340+
Send(MakeSharedPageCacheId(), saveCompactedPages.Release());
341+
342+
pageCollections.push_back(std::move(cache));
343+
}
313344

314345
NTable::TLoader loader(
315-
std::move(result.PageCollections),
346+
std::move(pageCollections),
316347
{ },
317348
std::move(result.Overlay));
318349

@@ -363,7 +394,7 @@ namespace NTabletFlatExecutor {
363394
}
364395

365396
if (fail) {
366-
prod->Results.clear(); /* shouldn't sent w/o fixation in bs */
397+
Y_ABORT_IF(prod->Results); /* shouldn't sent w/o fixation in bs */
367398
} else if (bool(prod->Results) != bool(WriteStats.Rows > 0)) {
368399
Y_ABORT("Unexpected rows production result after compaction");
369400
} else if ((bool(prod->Results) || bool(prod->TxStatus)) != bool(Blobs > 0)) {
@@ -525,6 +556,7 @@ namespace NTabletFlatExecutor {
525556
TVector<TBundle::TResult> Results;
526557
TVector<TIntrusiveConstPtr<NTable::TTxStatusPart>> TxStatus;
527558
const NScheme::TTypeRegistry * Registry = nullptr;
559+
NSharedCache::TSharedCachePages * SharedCachePages;
528560

529561
bool Finished = false;
530562
bool Failed = false;/* Failed to write blobs */

ydb/core/tablet_flat/flat_sausage_writer.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ namespace NPageCollection {
3535
Buffer.append(chunk.data(), chunk.size());
3636
offset += piece;
3737

38-
if (Buffer.size() >= MaxBlobSize) Flush();
38+
if (Buffer.size() >= MaxBlobSize) {
39+
Flush();
40+
}
3941
}
4042

4143
return Record.Push(type, body);

ydb/core/tablet_flat/flat_sausagecache.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ class TPrivatePageCache {
138138
EnsurePage(loaded.PageId)->Fill(std::move(loaded.Page), sticky);
139139
}
140140

141+
void Fill(ui32 pageId, TSharedPageRef page, bool sticky) noexcept {
142+
EnsurePage(pageId)->Fill(std::move(page), sticky);
143+
}
144+
141145
const TLogoBlobID Id;
142146
const TIntrusiveConstPtr<NPageCollection::IPageCollection> PageCollection;
143147
TPageMap<THolder<TPage>> PageMap;

ydb/core/tablet_flat/flat_writer_blocks.h

Lines changed: 20 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,19 @@ namespace NWriter {
1818
using TPageId = NTable::NPage::TPageId;
1919
using TCache = TPrivatePageCache::TInfo;
2020

21+
struct TResult : TMoveOnly {
22+
TIntrusiveConstPtr<NPageCollection::IPageCollection> PageCollection;
23+
TVector<NPageCollection::TLoadedPage> RegularPages;
24+
TVector<NPageCollection::TLoadedPage> StickyPages;
25+
};
26+
2127
TBlocks(ICone *cone, ui8 channel, ECache cache, ui32 block, bool stickyFlatIndex)
2228
: Cone(cone)
2329
, Channel(channel)
2430
, Cache(cache)
2531
, StickyFlatIndex(stickyFlatIndex)
2632
, Writer(Cone->CookieRange(1), Channel, block)
2733
{
28-
2934
}
3035

3136
~TBlocks()
@@ -35,39 +40,39 @@ namespace NWriter {
3540

3641
explicit operator bool() const noexcept
3742
{
38-
return Writer || Regular || Sticky;
43+
return Writer || Result.RegularPages || Result.StickyPages;
3944
}
4045

41-
TIntrusivePtr<TCache> Finish() noexcept
46+
TResult Finish() noexcept
4247
{
43-
TIntrusivePtr<TCache> pageCollection;
44-
4548
if (auto meta = Writer.Finish(false /* omit empty page collection */)) {
46-
for (auto &glob : Writer.Grab())
49+
for (auto &glob : Writer.Grab()) {
4750
Cone->Put(std::move(glob));
51+
}
4852

49-
pageCollection = MakePageCollection(std::move(meta));
53+
auto largeGlobId = CutToChunks(meta);
54+
Result.PageCollection = MakeIntrusiveConst<NPageCollection::TPageCollection>(largeGlobId, std::move(meta));
5055
}
5156

5257
Y_ABORT_UNLESS(!Writer, "Block writer is not empty after Finish");
53-
Y_ABORT_UNLESS(!Regular && !Sticky, "Unexpected non-empty page lists");
5458

55-
return pageCollection;
59+
return std::exchange(Result, {});
5660
}
5761

5862
TPageId Write(TSharedData raw, EPage type)
5963
{
6064
auto pageId = Writer.AddPage(raw, (ui32)type);
6165

62-
for (auto &glob : Writer.Grab())
66+
for (auto &glob : Writer.Grab()) {
6367
Cone->Put(std::move(glob));
68+
}
6469

65-
if (NTable::TLoader::NeedIn(type) || StickyFlatIndex && type == EPage::FlatIndex) {
70+
if (NTable::TLoader::NeedIn(type) || Cache == ECache::Ever || StickyFlatIndex && type == EPage::FlatIndex) {
6671
// Note: we mark flat index pages sticky after we load them
67-
Sticky.emplace_back(pageId, std::move(raw));
72+
Result.StickyPages.emplace_back(pageId, std::move(raw));
6873
} else if (bool(Cache) && type == EPage::DataPage || type == EPage::BTreeIndex) {
69-
// Note: we save b-tree index pages to shared cache regardless of a cache mode
70-
Regular.emplace_back(pageId, std::move(raw));
74+
// Note: we save b-tree index pages to shared cache regardless of a cache mode
75+
Result.RegularPages.emplace_back(pageId, std::move(raw));
7176
}
7277

7378
return pageId;
@@ -79,25 +84,6 @@ namespace NWriter {
7984
}
8085

8186
private:
82-
TIntrusivePtr<TCache> MakePageCollection(TSharedData body) noexcept
83-
{
84-
auto largeGlobId = CutToChunks(body);
85-
86-
auto *pack = new NPageCollection::TPageCollection(largeGlobId, std::move(body));
87-
88-
TIntrusivePtr<TCache> cache = new TCache(pack);
89-
90-
const bool sticky = (Cache == ECache::Ever);
91-
92-
for (auto &paged : Sticky) cache->Fill(paged, true);
93-
for (auto &paged : Regular) cache->Fill(paged, sticky);
94-
95-
Sticky.clear();
96-
Regular.clear();
97-
98-
return cache;
99-
}
100-
10187
NPageCollection::TLargeGlobId CutToChunks(TArrayRef<const char> body)
10288
{
10389
return Cone->Put(0, Channel, body, Writer.MaxBlobSize);
@@ -110,8 +96,7 @@ namespace NWriter {
11096
const bool StickyFlatIndex;
11197

11298
NPageCollection::TWriter Writer;
113-
TVector<NPageCollection::TLoadedPage> Regular;
114-
TVector<NPageCollection::TLoadedPage> Sticky;
99+
TResult Result;
115100
};
116101
}
117102
}

ydb/core/tablet_flat/flat_writer_bundle.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@ namespace NWriter {
1414
class TBundle : public NTable::IPageWriter, protected ICone {
1515
public:
1616
struct TResult {
17-
using TCache = TPrivatePageCache::TInfo;
18-
19-
TVector<TIntrusivePtr<TCache>> PageCollections;
17+
TVector<TBlocks::TResult> PageCollections;
2018
TDeque<NTable::TScreen::THole> Growth;
2119
TString Overlay;
2220
};
@@ -100,8 +98,8 @@ namespace NWriter {
10098
auto &result = Results_.emplace_back();
10199

102100
for (auto num : xrange(Blocks.size())) {
103-
if (auto cache = Blocks[num]->Finish()) {
104-
result.PageCollections.emplace_back(std::move(cache));
101+
if (auto written = Blocks[num]->Finish(); written.PageCollection) {
102+
result.PageCollections.emplace_back(std::move(written));
105103
} else if (num < Blocks.size() - 1) {
106104
Y_ABORT("Finish produced an empty main page collection");
107105
}

ydb/core/tablet_flat/shared_cache_events.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "defs.h"
44
#include "flat_bio_events.h"
55
#include "shared_handle.h"
6+
#include "shared_page.h"
67
#include <ydb/core/protos/shared_cache.pb.h>
78

89
#include <util/generic/map.h>
@@ -20,6 +21,7 @@ namespace NKikimr::NSharedCache {
2021
EvUnregister,
2122
EvInvalidate,
2223
EvAttach,
24+
EvSaveCompactedPages,
2325
EvRequest,
2426
EvResult,
2527
EvUpdated,
@@ -62,6 +64,19 @@ namespace NKikimr::NSharedCache {
6264
}
6365
};
6466

67+
// Note: compacted pages do not have an owner yet
68+
// at first they should be accepted by an executor
69+
// and it will send TEvAttach itself when it have happened
70+
struct TEvSaveCompactedPages : public TEventLocal<TEvSaveCompactedPages, EvSaveCompactedPages> {
71+
TIntrusiveConstPtr<NPageCollection::IPageCollection> PageCollection;
72+
TVector<TIntrusivePtr<TPage>> Pages;
73+
74+
TEvSaveCompactedPages(TIntrusiveConstPtr<NPageCollection::IPageCollection> pageCollection)
75+
: PageCollection(std::move(pageCollection))
76+
{
77+
}
78+
};
79+
6580
struct TEvRequest : public TEventLocal<TEvRequest, EvRequest> {
6681
const EPriority Priority;
6782
TAutoPtr<NPageCollection::TFetch> Fetch;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#pragma once
2+
#include "defs.h"
3+
#include "shared_handle.h"
4+
5+
namespace NKikimr::NSharedCache {
6+
7+
class TSharedCachePages : public TThrRefBase {
8+
public:
9+
TIntrusivePtr<TSharedPageGCList> GCList = new TSharedPageGCList;
10+
};
11+
12+
}

ydb/core/tablet_flat/shared_handle.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class TSharedPageHandle : public TThrRefBase {
1919
/**
2020
* Returns true if handle is initialized
2121
*
22-
* Unitialized handle may not be shared with other threads.
22+
* Uninitialized handle may not be shared with other threads.
2323
*/
2424
bool IsInitialized() const noexcept {
2525
return Flags.load(std::memory_order_relaxed) != FlagsUninitialized;

0 commit comments

Comments
 (0)