Skip to content

Commit 41d4909

Browse files
committed
wip introduce new wait pads
1 parent f1b5df5 commit 41d4909

File tree

7 files changed

+98
-109
lines changed

7 files changed

+98
-109
lines changed

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -624,12 +624,12 @@ void TExecutor::PlanTransactionActivation() {
624624
}
625625
}
626626

627-
void TExecutor::ActivateWaitingTransactions(TPrivatePageCache::TPage::TWaitQueuePtr waitPadsQueue) {
628-
if (waitPadsQueue) {
627+
void TExecutor::ActivateWaitingTransactions(const TVector<TIntrusivePtr<NPageCollection::TPagesWaitPad>>& waitPads) {
628+
if (waitPads) {
629629
bool activate = CanExecuteTransaction();
630630
bool cancelled = false;
631-
while (TPrivatePageCacheWaitPad *waitPad = waitPadsQueue->Pop()) {
632-
if (auto it = TransactionWaitPads.find(waitPad); it != TransactionWaitPads.end()) {
631+
for (auto& waitPad : waitPads) {
632+
if (auto it = TransactionWaitPads.find(waitPad.Get()); it != TransactionWaitPads.end()) {
633633
it->second->WaitingSpan.EndOk();
634634
TSeat* seat = it->second->Seat;
635635
Y_ABORT_UNLESS(seat->State == ESeatState::Waiting);
@@ -706,8 +706,8 @@ void TExecutor::DropSingleCache(const TLogoBlobID &label) noexcept
706706
Y_ABORT_UNLESS(StickyPagesMemory >= stickySize);
707707
StickyPagesMemory -= stickySize;
708708

709-
auto toActivate = PrivatePageCache->ForgetPageCollection(pageCollection);
710-
ActivateWaitingTransactions(toActivate);
709+
// auto toActivate = PrivatePageCache->ForgetPageCollection(pageCollection);
710+
// ActivateWaitingTransactions(toActivate);
711711
if (!PrivatePageCache->Info(label))
712712
Send(MakeSharedPageCacheId(), new NSharedCache::TEvInvalidate(label));
713713

@@ -2101,9 +2101,8 @@ void TExecutor::PostponeTransaction(TSeat* seat, TPageCollectionTxEnv &env,
21012101

21022102
LWTRACK(TransactionPageFault, seat->Self->Orbit, seat->UniqID);
21032103
seat->State = ESeatState::Waiting;
2104-
auto padHolder = MakeHolder<TTransactionWaitPad>(seat);
2105-
auto *const pad = padHolder.Get();
2106-
TransactionWaitPads[pad] = std::move(padHolder);
2104+
auto waitPad = MakeIntrusive<TTransactionWaitPad>(seat);
2105+
TransactionWaitPads[waitPad.Get()] = waitPad;
21072106

21082107
ui32 waitPages = 0;
21092108
ui64 loadBytes = 0;
@@ -2113,7 +2112,7 @@ void TExecutor::PostponeTransaction(TSeat* seat, TPageCollectionTxEnv &env,
21132112
TVector<NTable::TPageId> &pages = xpair.second;
21142113
waitPages += pages.size();
21152114

2116-
const std::pair<ui32, ui64> toLoad = PrivatePageCache->Request(pages, pad, pageCollectionInfo);
2115+
const std::pair<ui32, ui64> toLoad = PrivatePageCache->Request(pages, pageCollectionInfo);
21172116
if (toLoad.first) {
21182117
if (auto logl = Logger->Log(ELnLev::Dbg03)) {
21192118
logl
@@ -2126,7 +2125,9 @@ void TExecutor::PostponeTransaction(TSeat* seat, TPageCollectionTxEnv &env,
21262125
logl << "]";
21272126
}
21282127

2129-
auto *req = new NPageCollection::TFetch(0, pageCollectionInfo->PageCollection, std::move(pages), pad->GetWaitingTraceId());
2128+
auto *req = new NPageCollection::TFetch(0, pageCollectionInfo->PageCollection, std::move(pages));
2129+
req->TraceId = waitPad->GetWaitingTraceId();
2130+
req->WaitPad = waitPad; // TODO: deal with one tx multiple fetches
21302131

21312132
loadPages += toLoad.first;
21322133
loadBytes += toLoad.second;
@@ -2136,13 +2137,13 @@ void TExecutor::PostponeTransaction(TSeat* seat, TPageCollectionTxEnv &env,
21362137

21372138
if (auto logl = Logger->Log(ELnLev::Debug)) {
21382139
logl
2139-
<< NFmt::Do(*this) << " " << NFmt::Do(*pad->Seat) << " postponed"
2140+
<< NFmt::Do(*this) << " " << NFmt::Do(*seat) << " postponed"
21402141
<< ", " << loadBytes << "b, pages "
21412142
<< "{" << waitPages << " wait, " << loadPages << " load}"
21422143
<< ", freshly touched " << newPinnedPages << " pages";
21432144
}
21442145

2145-
pad->Seat->CPUBookkeepingTime += bookkeepingTimer.PassedReset();
2146+
seat->CPUBookkeepingTime += bookkeepingTimer.PassedReset();
21462147
Counters->Cumulative()[TExecutorCounters::TX_POSTPONED].Increment(1);
21472148

21482149
if (AppTxCounters && txType != UnknownTxType)
@@ -2151,7 +2152,7 @@ void TExecutor::PostponeTransaction(TSeat* seat, TPageCollectionTxEnv &env,
21512152
// Note: count all new touched pages (were obtained from cache), even not on the first attempt
21522153
Counters->Cumulative()[TExecutorCounters::TX_CACHE_HITS].Increment(newTouchedPages);
21532154
Counters->Cumulative()[TExecutorCounters::TX_BYTES_CACHED].Increment(newTouchedBytes);
2154-
if (pad->Seat->Retries == 1) {
2155+
if (seat->Retries == 1) {
21552156
Counters->Cumulative()[TExecutorCounters::TX_RETRIED].Increment(1);
21562157
}
21572158

@@ -2912,9 +2913,10 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
29122913
StickInMemPages(msg);
29132914
}
29142915
for (auto& loaded : msg->Loaded) {
2915-
TPrivatePageCache::TPage::TWaitQueuePtr transactionsToActivate = PrivatePageCache->ProvideBlock(std::move(loaded), collectionInfo);
2916-
ActivateWaitingTransactions(transactionsToActivate);
2916+
PrivatePageCache->ProvideBlock(std::move(loaded), collectionInfo);
29172917
}
2918+
Y_ABORT_UNLESS(msg->WaitPad);
2919+
ActivateWaitingTransactions({std::move(msg->WaitPad)});
29182920
}
29192921
return;
29202922

ydb/core/tablet_flat/flat_executor.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ struct TExecutorStatsImpl : public TExecutorStats {
292292
ui64 PacksMetaBytes = 0; /* Memory occupied by NPageCollection::TMeta */
293293
};
294294

295-
struct TTransactionWaitPad : public TPrivatePageCacheWaitPad {
295+
struct TTransactionWaitPad : public NPageCollection::TPagesWaitPad {
296296
TSeat* Seat;
297297
NWilson::TSpan WaitingSpan;
298298

@@ -460,7 +460,7 @@ class TExecutor
460460

461461
TActorId Launcher;
462462

463-
THashMap<TPrivatePageCacheWaitPad*, THolder<TTransactionWaitPad>> TransactionWaitPads;
463+
THashMap<NPageCollection::TPagesWaitPad*, TIntrusivePtr<TTransactionWaitPad>> TransactionWaitPads;
464464

465465
ui64 TransactionUniqCounter = 0;
466466

@@ -528,7 +528,7 @@ class TExecutor
528528
void EnqueueActivation(TSeat* seat, bool activate);
529529
void PlanTransactionActivation();
530530
void MakeLogSnapshot();
531-
void ActivateWaitingTransactions(TPrivatePageCache::TPage::TWaitQueuePtr waitPadsQueue);
531+
void ActivateWaitingTransactions(const TVector<TIntrusivePtr<NPageCollection::TPagesWaitPad>>& waitPads);
532532
void AddCachesOfBundle(const NTable::TPartView &partView) noexcept;
533533
void AddSingleCache(const TIntrusivePtr<TPrivatePageCache::TInfo> &info) noexcept;
534534
void DropCachesOfBundle(const NTable::TPart &part) noexcept;

ydb/core/tablet_flat/flat_sausage_fetch.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
#pragma once
22

33
#include "flat_sausage_gut.h"
4+
#include "flat_page_iface.h"
45

56
#include <ydb/library/actors/util/shared_data.h>
67

78
namespace NKikimr {
89
namespace NPageCollection {
910

11+
struct TPagesWaitPad : public TThrRefBase {
12+
// no internal state
13+
};
14+
1015
struct TFetch {
11-
TFetch(ui64 cookie, TIntrusiveConstPtr<IPageCollection> pageCollection, TVector<ui32> pages, NWilson::TTraceId traceId = {})
16+
using TPageId = NTable::NPage::TPageId;
17+
18+
TFetch(ui64 cookie, TIntrusiveConstPtr<IPageCollection> pageCollection, TVector<TPageId> pages)
1219
: Cookie(cookie)
1320
, PageCollection(std::move(pageCollection))
1421
, Pages(std::move(pages))
15-
, TraceId(std::move(traceId))
1622
{
1723

1824
}
@@ -27,8 +33,9 @@ namespace NPageCollection {
2733
const ui64 Cookie = Max<ui64>();
2834

2935
TIntrusiveConstPtr<IPageCollection> PageCollection;
30-
TVector<ui32> Pages;
36+
TVector<TPageId> Pages;
3137
NWilson::TTraceId TraceId;
38+
TIntrusivePtr<TPagesWaitPad> WaitPad;
3239
};
3340

3441
struct TLoadedPage {

ydb/core/tablet_flat/flat_sausagecache.cpp

Lines changed: 36 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -66,38 +66,38 @@ void TPrivatePageCache::RegisterPageCollection(TIntrusivePtr<TInfo> info) {
6666
++info->Users;
6767
}
6868

69-
TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ForgetPageCollection(TIntrusivePtr<TInfo> info) {
70-
// todo: amortize destruction cost (how?)
71-
72-
TPage::TWaitQueuePtr ret;
73-
for (const auto& kv : info->PageMap) {
74-
auto* page = kv.second.Get();
75-
Y_DEBUG_ABORT_UNLESS(page);
76-
77-
if (page->LoadState == TPage::LoadStateRequested) {
78-
while (TPrivatePageCacheWaitPad *x = page->WaitQueue->Pop()) {
79-
if (0 == x->Dec()) {
80-
if (!ret)
81-
ret = new TPrivatePageCache::TPage::TWaitQueue;
82-
ret->Push(x);
83-
}
84-
}
85-
page->WaitQueue.Destroy();
86-
}
87-
88-
if (page->PinPad) {
89-
page->PinPad.Drop();
90-
Stats.PinnedSetSize -= page->Size;
91-
92-
if (page->LoadState != TPage::LoadStateLoaded)
93-
Stats.PinnedLoadSize -= page->Size;
94-
}
95-
}
96-
97-
UnlockPageCollection(info->Id);
98-
99-
return ret;
100-
}
69+
// TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ForgetPageCollection(TIntrusivePtr<TInfo> info) {
70+
// // todo: amortize destruction cost (how?)
71+
72+
// TPage::TWaitQueuePtr ret;
73+
// for (const auto& kv : info->PageMap) {
74+
// auto* page = kv.second.Get();
75+
// Y_DEBUG_ABORT_UNLESS(page);
76+
77+
// if (page->LoadState == TPage::LoadStateRequested) {
78+
// while (TPrivatePageCacheWaitPad *x = page->WaitQueue->Pop()) {
79+
// if (0 == x->Dec()) {
80+
// if (!ret)
81+
// ret = new TPrivatePageCache::TPage::TWaitQueue;
82+
// ret->Push(x);
83+
// }
84+
// }
85+
// page->WaitQueue.Destroy();
86+
// }
87+
88+
// if (page->PinPad) {
89+
// page->PinPad.Drop();
90+
// Stats.PinnedSetSize -= page->Size;
91+
92+
// if (page->LoadState != TPage::LoadStateLoaded)
93+
// Stats.PinnedLoadSize -= page->Size;
94+
// }
95+
// }
96+
97+
// UnlockPageCollection(info->Id);
98+
99+
// return ret;
100+
// }
101101

102102
void TPrivatePageCache::LockPageCollection(TLogoBlobID id) {
103103
auto it = PageCollections.find(id);
@@ -118,7 +118,7 @@ bool TPrivatePageCache::UnlockPageCollection(TLogoBlobID id) {
118118
auto* page = kv.second.Get();
119119
Y_DEBUG_ABORT_UNLESS(page);
120120

121-
Y_ABORT_UNLESS(!page->WaitQueue, "non-empty wait queue in forgotten page.");
121+
// Y_ABORT_UNLESS(!page->WaitQueue, "non-empty wait queue in forgotten page.");
122122
Y_ABORT_UNLESS(!page->PinPad, "non-empty pin pad in forgotten page.");
123123

124124
if (page->SharedBody)
@@ -174,7 +174,7 @@ void TPrivatePageCache::Unpin(TPage *page, TPrivatePageCachePinPad *pad) {
174174
}
175175
}
176176

177-
std::pair<ui32, ui64> TPrivatePageCache::Request(TVector<TPageId> &pages, TPrivatePageCacheWaitPad *waitPad, TInfo *info) {
177+
std::pair<ui32, ui64> TPrivatePageCache::Request(TVector<TPageId> &pages, TInfo *info) {
178178
ui32 blocksToRequest = 0;
179179
ui64 bytesToRequest = 0;
180180

@@ -185,27 +185,16 @@ std::pair<ui32, ui64> TPrivatePageCache::Request(TVector<TPageId> &pages, TPriva
185185
TPage *page = info->EnsurePage(*it);
186186
switch (page->LoadState) {
187187
case TPage::LoadStateNo:
188-
case TPage::LoadStateRequestedAsync:
189188
page->LoadState = TPage::LoadStateRequested;
190189
bytesToRequest += page->Size;
191190

192-
Y_ABORT_UNLESS(!page->WaitQueue);
193-
page->WaitQueue = new TPage::TWaitQueue();
194-
page->WaitQueue->Push(waitPad);
195-
waitPad->Inc();
196-
197191
++blocksToRequest;
198192
++it;
199193
break;
200194
case TPage::LoadStateLoaded:
201195
Y_ABORT("must not request already loaded pages");
202196
case TPage::LoadStateRequested:
203-
if (!page->WaitQueue)
204-
page->WaitQueue = new TPage::TWaitQueue();
205-
206-
page->WaitQueue->Push(waitPad);
207-
waitPad->Inc();
208-
197+
// TODO: request always
209198
--end;
210199
if (end != it)
211200
*it = *end;
@@ -416,15 +405,12 @@ void TPrivatePageCache::DropSharedBody(TInfo *info, TPageId pageId) {
416405
}
417406
}
418407

419-
TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ProvideBlock(
420-
NSharedCache::TEvResult::TLoaded&& loaded, TInfo *info)
421-
{
408+
void TPrivatePageCache::ProvideBlock(NSharedCache::TEvResult::TLoaded&& loaded, TInfo *info) {
422409
Y_DEBUG_ABORT_UNLESS(loaded.Page && loaded.Page.IsUsed());
423410
TPage *page = info->EnsurePage(loaded.PageId);
424411

425412
if (page->LoadState != TPage::LoadStateLoaded && page->PinPad)
426413
Stats.PinnedLoadSize -= page->Size;
427-
428414
if (Y_UNLIKELY(page->SharedBody))
429415
Stats.TotalSharedBody -= page->Size;
430416
if (Y_UNLIKELY(page->PinnedBody))
@@ -436,20 +422,6 @@ TPrivatePageCache::TPage::TWaitQueuePtr TPrivatePageCache::ProvideBlock(
436422
Stats.TotalSharedBody += page->Size;
437423
Stats.TotalPinnedBody += page->Size;
438424
TryUnload(page);
439-
440-
TPage::TWaitQueuePtr ret;
441-
if (page->WaitQueue) {
442-
while (TPrivatePageCacheWaitPad *x = page->WaitQueue->Pop()) {
443-
if (0 == x->Dec()) {
444-
if (!ret)
445-
ret = new TPage::TWaitQueue();
446-
ret->Push(x);
447-
}
448-
}
449-
page->WaitQueue.Destroy();
450-
}
451-
452-
return ret;
453425
}
454426

455427
THashMap<TLogoBlobID, TIntrusivePtr<TPrivatePageCache::TInfo>> TPrivatePageCache::DetachPrivatePageCache() {

ydb/core/tablet_flat/flat_sausagecache.h

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@ struct TPrivatePageCachePinPad : public TAtomicRefCount<TPrivatePageCachePinPad>
1515
// no internal state
1616
};
1717

18-
struct TPrivatePageCacheWaitPad : public TExplicitSimpleCounter {
19-
// no internal state
20-
};
21-
2218
class TPrivatePageCache {
2319
using TPageId = NTable::NPage::TPageId;
2420
using TPinned = THashMap<TLogoBlobID, THashMap<TPageId, TIntrusivePtr<TPrivatePageCachePinPad>>>;
@@ -40,14 +36,10 @@ class TPrivatePageCache {
4036
};
4137

4238
struct TPage : public TIntrusiveListItem<TPage> {
43-
using TWaitQueue = TOneOneQueueInplace<TPrivatePageCacheWaitPad *, 64>;
44-
using TWaitQueuePtr = TAutoPtr<TWaitQueue, TWaitQueue::TCleanDestructor>;
45-
4639
enum ELoadState {
4740
LoadStateNo,
4841
LoadStateLoaded,
4942
LoadStateRequested,
50-
LoadStateRequestedAsync,
5143
};
5244

5345
ui32 LoadState : 2;
@@ -57,7 +49,6 @@ class TPrivatePageCache {
5749

5850
TInfo* const Info;
5951
TIntrusivePtr<TPrivatePageCachePinPad> PinPad;
60-
TWaitQueuePtr WaitQueue;
6152
TSharedPageRef SharedBody;
6253
TSharedData PinnedBody;
6354

@@ -70,7 +61,6 @@ class TPrivatePageCache {
7061
return (
7162
LoadState == LoadStateNo &&
7263
!PinPad &&
73-
!WaitQueue &&
7464
!SharedBody);
7565
}
7666

@@ -164,7 +154,7 @@ class TPrivatePageCache {
164154
public:
165155
TIntrusivePtr<TInfo> GetPageCollection(TLogoBlobID id) const;
166156
void RegisterPageCollection(TIntrusivePtr<TInfo> info);
167-
TPage::TWaitQueuePtr ForgetPageCollection(TIntrusivePtr<TInfo> info);
157+
// TPage::TWaitQueuePtr ForgetPageCollection(TIntrusivePtr<TInfo> info);
168158

169159
void LockPageCollection(TLogoBlobID id);
170160
// Return true for page collections removed after unlock.
@@ -174,7 +164,7 @@ class TPrivatePageCache {
174164

175165
const TStats& GetStats() const { return Stats; }
176166

177-
std::pair<ui32, ui64> Request(TVector<ui32> &pages, TPrivatePageCacheWaitPad *waitPad, TInfo *info); // blocks to load, bytes to load
167+
std::pair<ui32, ui64> Request(TVector<ui32> &pages, TInfo *info); // blocks to load, bytes to load
178168

179169
const TSharedData* Lookup(TPageId pageId, TInfo *collection);
180170

@@ -187,7 +177,7 @@ class TPrivatePageCache {
187177

188178
void DropSharedBody(TInfo *collectionInfo, TPageId pageId);
189179

190-
TPage::TWaitQueuePtr ProvideBlock(NSharedCache::TEvResult::TLoaded&& loaded, TInfo *collectionInfo);
180+
void ProvideBlock(NSharedCache::TEvResult::TLoaded&& loaded, TInfo *collectionInfo);
191181
THashMap<TLogoBlobID, TIntrusivePtr<TInfo>> DetachPrivatePageCache();
192182
THashMap<TLogoBlobID, THashSet<TPageId>> GetPrepareSharedTouched();
193183

ydb/core/tablet_flat/shared_cache_events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ namespace NKikimr::NSharedCache {
132132
const ui64 Cookie;
133133
const TIntrusiveConstPtr<NPageCollection::IPageCollection> Origin;
134134
TVector<TLoaded> Loaded;
135+
TIntrusivePtr<NPageCollection::TPagesWaitPad> WaitPad;
135136
};
136137

137138
struct TEvUpdated : public TEventLocal<TEvUpdated, EvUpdated> {

0 commit comments

Comments
 (0)