Skip to content

Commit 7ffdf5b

Browse files
authored
Merge 878e57c into 64d58b3
2 parents 64d58b3 + 878e57c commit 7ffdf5b

23 files changed

+509
-388
lines changed

ydb/core/base/appdata.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include <ydb/core/control/immediate_control_board_impl.h>
1414
#include <ydb/core/grpc_services/grpc_helper.h>
15+
#include <ydb/core/tablet_flat/shared_cache_pages.h>
1516
#include <ydb/core/protos/auth.pb.h>
1617
#include <ydb/core/protos/bootstrap.pb.h>
1718
#include <ydb/core/protos/blobstorage.pb.h>
@@ -93,6 +94,7 @@ TAppData::TAppData(
9394
, Mon(nullptr)
9495
, Icb(new TControlBoard())
9596
, InFlightLimiterRegistry(new NGRpcService::TInFlightLimiterRegistry(Icb))
97+
, SharedCachePages(new NSharedCache::TSharedCachePages())
9698
, StreamingConfig(Impl->StreamingConfig)
9799
, PQConfig(Impl->PQConfig)
98100
, PQClusterDiscoveryConfig(Impl->PQClusterDiscoveryConfig)

ydb/core/base/appdata_fwd.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ namespace NKikimr {
1919
namespace NGRpcService {
2020
class TInFlightLimiterRegistry;
2121
}
22+
namespace NSharedCache {
23+
class TSharedCachePages;
24+
}
2225
}
2326

2427
namespace NKikimrCms {
@@ -189,6 +192,7 @@ struct TAppData {
189192
::NMonitoring::TDynamicCounterPtr Counters;
190193
TIntrusivePtr<NKikimr::TControlBoard> Icb;
191194
TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> InFlightLimiterRegistry;
195+
TIntrusivePtr<NSharedCache::TSharedCachePages> SharedCachePages;
192196

193197
TIntrusivePtr<NInterconnect::TPollerThreads> PollerThreads;
194198

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,9 @@ void TExecutor::RecreatePageCollectionsCache() noexcept
214214
for (const auto &it : Database->GetScheme().Tables) {
215215
auto subset = Database->Subset(it.first, NTable::TEpoch::Max(), { }, { });
216216

217-
for (auto &partView : subset->Flatten) AddCachesOfBundle(partView);
217+
for (auto &partView : subset->Flatten) {
218+
AddCachesOfBundle(partView);
219+
}
218220
}
219221

220222
if (TransactionWaitPads) {
@@ -2360,7 +2362,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
23602362
auto *bySwitchAux = aux.AddBySwitchAux();
23612363

23622364
TPageCollectionProtoHelper::Snap(snap, loaned->PartComponents, partSwitch.TableId, CompactionLogic->BorrowedPartLevel());
2363-
TPageCollectionProtoHelper(true, false).Do(bySwitchAux->AddHotBundles(), loaned->PartComponents);
2365+
TPageCollectionProtoHelper(true).Do(bySwitchAux->AddHotBundles(), loaned->PartComponents);
23642366

23652367
auto body = proto.SerializeAsString();
23662368
auto glob = CommitManager->Turns.One(commit->Refs, std::move(body), true);
@@ -2821,9 +2823,6 @@ void TExecutor::Handle(NSharedCache::TEvUpdated::TPtr &ev) {
28212823

28222824
for (auto &kv : msg->Actions) {
28232825
if (auto *info = PrivatePageCache->Info(kv.first)) {
2824-
for (auto &kvCorrected : kv.second.Accepted) {
2825-
PrivatePageCache->UpdateSharedBody(info, kvCorrected.first, std::move(kvCorrected.second));
2826-
}
28272826
for (ui32 pageId : kv.second.Dropped) {
28282827
PrivatePageCache->DropSharedBody(info, pageId);
28292828
}
@@ -3472,7 +3471,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
34723471
const auto &newPart = result.Part;
34733472

34743473
TPageCollectionProtoHelper::Snap(snap, newPart, tableId, logicResult.Changes.NewPartsLevel);
3475-
TPageCollectionProtoHelper(true, false).Do(bySwitchAux->AddHotBundles(), newPart);
3474+
TPageCollectionProtoHelper(true).Do(bySwitchAux->AddHotBundles(), newPart);
34763475
}
34773476
}
34783477

@@ -3648,7 +3647,6 @@ void TExecutor::UpdateCounters(const TActorContext &ctx) {
36483647
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_SHARED_BODY].Set(stats.TotalSharedBody);
36493648
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_PINNED_BODY].Set(stats.TotalPinnedBody);
36503649
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_EXCLUSIVE].Set(stats.TotalExclusive);
3651-
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_SHARED_PENDING].Set(stats.TotalSharedPending);
36523650
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_STICKY].Set(stats.TotalSticky);
36533651
}
36543652

@@ -3770,14 +3768,14 @@ TString TExecutor::BorrowSnapshot(ui32 table, const TTableSnapshotContext &snap,
37703768
for (const auto &partView : subset->Flatten) {
37713769
auto *x = proto.AddParts();
37723770

3773-
TPageCollectionProtoHelper(false, false).Do(x->MutableBundle(), partView);
3771+
TPageCollectionProtoHelper(false).Do(x->MutableBundle(), partView);
37743772
snap.Impl->Borrowed(Step(), table, partView->Label, loaner);
37753773
}
37763774

37773775
for (const auto &part : subset->ColdParts) {
37783776
auto *x = proto.AddParts();
37793777

3780-
TPageCollectionProtoHelper(false, false).Do(x->MutableBundle(), part);
3778+
TPageCollectionProtoHelper(false).Do(x->MutableBundle(), part);
37813779
snap.Impl->Borrowed(Step(), table, part->Label, loaner);
37823780
}
37833781

@@ -4170,7 +4168,6 @@ void TExecutor::RenderHtmlPage(NMon::TEvRemoteHttpInfo::TPtr &ev) const {
41704168
DIV_CLASS("row") {str << "Total bytes in shared cache: " << PrivatePageCache->GetStats().TotalSharedBody; }
41714169
DIV_CLASS("row") {str << "Total bytes in local cache: " << PrivatePageCache->GetStats().TotalPinnedBody; }
41724170
DIV_CLASS("row") {str << "Total bytes exclusive to local cache: " << PrivatePageCache->GetStats().TotalExclusive; }
4173-
DIV_CLASS("row") {str << "Total bytes in transit to shared cache: " << PrivatePageCache->GetStats().TotalSharedPending; }
41744171
DIV_CLASS("row") {str << "Total bytes marked as sticky: " << PrivatePageCache->GetStats().TotalSticky; }
41754172

41764173
if (GcLogic) {

ydb/core/tablet_flat/flat_executor.proto

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@ message TLargeGlobId {
1515
}
1616

1717
message TPageCollection {
18-
message TPage {
19-
required uint32 Id = 1;
20-
required bytes Body = 2;
21-
}
22-
2318
repeated NKikimrProto.TLogoBlobID MetaId = 1; // Replaced by TLargeGlobId
2419

2520
optional TLargeGlobId LargeGlobId = 6;
@@ -30,7 +25,7 @@ message TPageCollection {
3025
// packed page collection is called HotBundle.
3126

3227
optional bytes Meta = 2;
33-
repeated TPage Pages = 5;
28+
reserved 5; // Pages
3429
}
3530

3631
message TBundle {

ydb/core/tablet_flat/flat_executor_tx_env.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ namespace NTabletFlatExecutor {
3838
return { !ReadMissingReferences, page };
3939
}
4040

41-
const TSharedData* TryGetPage(const TPart* part, TPageId page, TGroupId groupId) override
41+
const TSharedData* TryGetPage(const TPart* part, TPageId pageId, TGroupId groupId) override
4242
{
4343
auto *partStore = CheckedCast<const NTable::TPartStore*>(part);
4444

45-
return Lookup(partStore->PageCollections.at(groupId.Index).Get(), page);
45+
return Lookup(partStore->PageCollections.at(groupId.Index).Get(), pageId);
4646
}
4747

4848
void EnableReadMissingReferences() noexcept {

ydb/core/tablet_flat/flat_ops_compact.h

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include "flat_comp.h"
1414
#include "flat_executor_misc.h"
1515
#include "flat_bio_stats.h"
16+
#include "shared_cache_pages.h"
17+
#include "shared_sausagecache.h"
1618
#include "util_channel.h"
1719

1820
#include <ydb/core/base/blobstorage.h>
@@ -102,6 +104,7 @@ namespace NTabletFlatExecutor {
102104

103105
Spent = new TSpent(TAppData::TimeProvider.Get());
104106
Registry = AppData()->TypeRegistry;
107+
SharedCachePages = AppData()->SharedCachePages;
105108
Scheme = std::move(scheme);
106109
Driver = driver;
107110

@@ -308,11 +311,39 @@ namespace NTabletFlatExecutor {
308311
auto *prod = new TProdCompact(!fail, Mask.Step(), std::move(Conf->Params),
309312
std::move(YellowMoveChannels), std::move(YellowStopChannels));
310313

314+
if (fail) {
315+
Results.clear(); /* shouldn't sent w/o fixation in bs */
316+
}
317+
311318
for (auto &result : Results) {
312319
Y_ABORT_UNLESS(result.PageCollections, "Compaction produced a part without page collections");
320+
TVector<TIntrusivePtr<NTable::TLoader::TCache>> pageCollections;
321+
for (auto& pageCollection : result.PageCollections) {
322+
auto cache = MakeIntrusive<NTable::TLoader::TCache>(pageCollection.PageCollection);
323+
auto saveCompactedPages = MakeHolder<NSharedCache::TEvSaveCompactedPages>(pageCollection.PageCollection);
324+
auto gcList = SharedCachePages->GCList;
325+
auto addPage = [&saveCompactedPages, &pageCollection, &cache, &gcList](NPageCollection::TLoadedPage& loadedPage, bool sticky) {
326+
auto pageId = loadedPage.PageId;
327+
auto pageSize = pageCollection.PageCollection->Page(pageId).Size;
328+
auto sharedPage = MakeIntrusive<TPage>(pageId, pageSize, nullptr);
329+
sharedPage->Initialize(std::move(loadedPage.Data));
330+
saveCompactedPages->Pages.push_back(sharedPage);
331+
cache->Fill(pageId, TSharedPageRef::MakeUsed(std::move(sharedPage), gcList), sticky);
332+
};
333+
for (auto &page : pageCollection.StickyPages) {
334+
addPage(page, true);
335+
}
336+
for (auto &page : pageCollection.RegularPages) {
337+
addPage(page, false);
338+
}
339+
340+
Send(MakeSharedPageCacheId(), saveCompactedPages.Release());
341+
342+
pageCollections.push_back(std::move(cache));
343+
}
313344

314345
NTable::TLoader loader(
315-
std::move(result.PageCollections),
346+
std::move(pageCollections),
316347
{ },
317348
std::move(result.Overlay));
318349

@@ -363,7 +394,7 @@ namespace NTabletFlatExecutor {
363394
}
364395

365396
if (fail) {
366-
prod->Results.clear(); /* shouldn't sent w/o fixation in bs */
397+
Y_ABORT_IF(prod->Results); /* shouldn't sent w/o fixation in bs */
367398
} else if (bool(prod->Results) != bool(WriteStats.Rows > 0)) {
368399
Y_ABORT("Unexpected rows production result after compaction");
369400
} else if ((bool(prod->Results) || bool(prod->TxStatus)) != bool(Blobs > 0)) {
@@ -525,6 +556,7 @@ namespace NTabletFlatExecutor {
525556
TVector<TBundle::TResult> Results;
526557
TVector<TIntrusiveConstPtr<NTable::TTxStatusPart>> TxStatus;
527558
const NScheme::TTypeRegistry * Registry = nullptr;
559+
TIntrusivePtr<NSharedCache::TSharedCachePages> SharedCachePages;
528560

529561
bool Finished = false;
530562
bool Failed = false;/* Failed to write blobs */

ydb/core/tablet_flat/flat_part_loader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ namespace NTable {
8989
if (type != EPage::FlatIndex) {
9090
// hack: saving flat index to private cache will break sticky logic
9191
// keep it in shared cache only for now
92-
Cache->Fill(std::move(loaded), NeedIn(type));
92+
Cache->Fill(loaded.PageId, std::move(loaded.Page), NeedIn(type));
9393
}
9494
}
9595
}

ydb/core/tablet_flat/flat_part_outset.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
#include "defs.h"
44
#include "flat_sausage_packet.h"
5-
#include "flat_sausage_fetch.h"
65

76
namespace NKikimr {
87
namespace NTable {
@@ -12,7 +11,6 @@ namespace NTable {
1211
NPageCollection::TLargeGlobId LargeGlobId;
1312
// loaded meta page
1413
TIntrusiveConstPtr<NPageCollection::TPageCollection> Packet;
15-
TVector<NPageCollection::TLoadedPage> Sticky;
1614

1715
void ParsePacket(TSharedData meta);
1816
};

ydb/core/tablet_flat/flat_part_store.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,6 @@ class TPartStore : public TPart, public IBundle {
154154

155155
for (auto &one: components) {
156156
caches.emplace_back(new TCache(std::move(one.Packet)));
157-
158-
for (auto &page: one.Sticky)
159-
caches.back()->Fill(page, true);
160157
}
161158

162159
return caches;

ydb/core/tablet_flat/flat_sausage_writer.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ namespace NPageCollection {
3535
Buffer.append(chunk.data(), chunk.size());
3636
offset += piece;
3737

38-
if (Buffer.size() >= MaxBlobSize) Flush();
38+
if (Buffer.size() >= MaxBlobSize) {
39+
Flush();
40+
}
3941
}
4042

4143
return Record.Push(type, body);

0 commit comments

Comments
 (0)