Skip to content

Commit 2085b80

Browse files
authored
Merge c348338 into b571990
2 parents b571990 + c348338 commit 2085b80

File tree

3 files changed

+93
-15
lines changed

3 files changed

+93
-15
lines changed

ydb/core/tx/datashard/cdc_stream_scan.cpp

+21-3
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ class TDataShard::TTxCdcStreamScanProgress
213213
TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_SCAN_PROGRESS; }
214214

215215
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
216-
const auto& ev = *Request->Get();
216+
auto& ev = *Request->Get();
217217
const auto& tablePathId = ev.TablePathId;
218218
const auto& streamPathId = ev.StreamPathId;
219219
const auto& readVersion = ev.ReadVersion;
@@ -238,7 +238,25 @@ class TDataShard::TTxCdcStreamScanProgress
238238
}
239239

240240
ChangeRecords.clear();
241-
if (Self->CheckChangesQueueOverflow()) {
241+
242+
if (!ev.ReservationCookie) {
243+
ev.ReservationCookie = Self->ReserveChangeQueueCapacity(ev.Rows.size());
244+
}
245+
246+
if (!ev.ReservationCookie) {
247+
LOG_I("Cannot reserve change queue capacity");
248+
Reschedule = true;
249+
return true;
250+
}
251+
252+
if (Self->GetFreeChangeQueueCapacity(ev.ReservationCookie) < ev.Rows.size()) {
253+
LOG_I("Not enough change queue capacity");
254+
Reschedule = true;
255+
return true;
256+
}
257+
258+
if (Self->CheckChangesQueueOverflow(ev.ReservationCookie)) {
259+
LOG_I("Change queue overflow");
242260
Reschedule = true;
243261
return true;
244262
}
@@ -335,7 +353,7 @@ class TDataShard::TTxCdcStreamScanProgress
335353
LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)"
336354
<< ": streamPathId# " << Request->Get()->StreamPathId);
337355

338-
Self->EnqueueChangeRecords(std::move(ChangeRecords));
356+
Self->EnqueueChangeRecords(std::move(ChangeRecords), Request->Get()->ReservationCookie);
339357
ctx.Send(Request->Sender, Response.Release());
340358
} else if (Reschedule) {
341359
LOG_I("Re-schedule progress tx"

ydb/core/tx/datashard/datashard.cpp

+57-6
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,13 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
899899
}
900900
}
901901

902+
if (auto rIt = ChangeQueueReservations.find(record.ReservationCookie); rIt != ChangeQueueReservations.end()) {
903+
--ChangeQueueReservedCapacity;
904+
if (!--rIt->second) {
905+
ChangeQueueReservations.erase(rIt);
906+
}
907+
}
908+
902909
UpdateChangeExchangeLag(AppData()->TimeProvider->Now());
903910
ChangesQueue.erase(it);
904911

@@ -908,7 +915,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
908915
CheckChangesQueueNoOverflow();
909916
}
910917

911-
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records) {
918+
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
912919
if (!records) {
913920
return;
914921
}
@@ -933,7 +940,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
933940
auto res = ChangesQueue.emplace(
934941
std::piecewise_construct,
935942
std::forward_as_tuple(record.Order),
936-
std::forward_as_tuple(record, now)
943+
std::forward_as_tuple(record, now, cookie)
937944
);
938945
if (res.second) {
939946
ChangesList.PushBack(&res.first->second);
@@ -956,6 +963,38 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
956963
Send(OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
957964
}
958965

966+
ui32 TDataShard::GetFreeChangeQueueCapacity(ui64 cookie) {
967+
const auto sizeLimit = AppData()->DataShardConfig.GetChangesQueueItemsLimit();
968+
if (sizeLimit < ChangesQueue.size()) {
969+
return 0;
970+
}
971+
972+
const auto free = Min(sizeLimit - ChangesQueue.size(), Max(sizeLimit / 2, 1ul));
973+
974+
ui32 reserved = ChangeQueueReservedCapacity;
975+
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
976+
reserved -= it->second;
977+
}
978+
979+
if (free < reserved) {
980+
return 0;
981+
}
982+
983+
return free - reserved;
984+
}
985+
986+
ui64 TDataShard::ReserveChangeQueueCapacity(ui32 capacity) {
987+
const auto sizeLimit = AppData()->DataShardConfig.GetChangesQueueItemsLimit();
988+
if (Max(sizeLimit / 2, 1ul) < ChangeQueueReservedCapacity) {
989+
return 0;
990+
}
991+
992+
const auto cookie = NextChangeQueueReservationCookie++;
993+
ChangeQueueReservations.emplace(cookie, capacity);
994+
ChangeQueueReservedCapacity += capacity;
995+
return cookie;
996+
}
997+
959998
void TDataShard::UpdateChangeExchangeLag(TInstant now) {
960999
if (!ChangesList.Empty()) {
9611000
const auto* front = ChangesList.Front();
@@ -3391,19 +3430,31 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr
33913430
return false;
33923431
}
33933432

3394-
bool TDataShard::CheckChangesQueueOverflow() const {
3433+
bool TDataShard::CheckChangesQueueOverflow(ui64 cookie) const {
33953434
const auto* appData = AppData();
33963435
const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit();
33973436
const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit();
3398-
return ChangesQueue.size() >= sizeLimit || ChangesQueueBytes >= bytesLimit;
3437+
3438+
ui32 reserved = ChangeQueueReservedCapacity;
3439+
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
3440+
reserved -= it->second;
3441+
}
3442+
3443+
return (ChangesQueue.size() + reserved) >= sizeLimit || ChangesQueueBytes >= bytesLimit;
33993444
}
34003445

3401-
void TDataShard::CheckChangesQueueNoOverflow() {
3446+
void TDataShard::CheckChangesQueueNoOverflow(ui64 cookie) {
34023447
if (OverloadSubscribersByReason[RejectReasonIndex(ERejectReason::ChangesQueueOverflow)]) {
34033448
const auto* appData = AppData();
34043449
const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit();
34053450
const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit();
3406-
if (ChangesQueue.size() < sizeLimit && ChangesQueueBytes < bytesLimit) {
3451+
3452+
ui32 reserved = ChangeQueueReservedCapacity;
3453+
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
3454+
reserved -= it->second;
3455+
}
3456+
3457+
if ((ChangesQueue.size() + reserved) < sizeLimit && ChangesQueueBytes < bytesLimit) {
34073458
NotifyOverloadSubscribers(ERejectReason::ChangesQueueOverflow);
34083459
}
34093460
}

ydb/core/tx/datashard/datashard_impl.h

+15-6
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ class TDataShard
531531
const TRowVersion ReadVersion;
532532
const TVector<ui32> ValueTags;
533533
TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Rows;
534+
ui64 ReservationCookie = 0;
534535
const TCdcStreamScanManager::TStats Stats;
535536
};
536537

@@ -1837,7 +1838,9 @@ class TDataShard
18371838
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId);
18381839
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId);
18391840
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
1840-
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records);
1841+
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie = 0);
1842+
ui32 GetFreeChangeQueueCapacity(ui64 cookie);
1843+
ui64 ReserveChangeQueueCapacity(ui32 capacity);
18411844
void UpdateChangeExchangeLag(TInstant now);
18421845
void CreateChangeSender(const TActorContext& ctx);
18431846
void KillChangeSender(const TActorContext& ctx);
@@ -1976,8 +1979,8 @@ class TDataShard
19761979
void WaitPredictedPlanStep(ui64 step);
19771980
void SchedulePlanPredictedTxs();
19781981

1979-
bool CheckChangesQueueOverflow() const;
1980-
void CheckChangesQueueNoOverflow();
1982+
bool CheckChangesQueueOverflow(ui64 cookie = 0) const;
1983+
void CheckChangesQueueNoOverflow(ui64 cookie = 0);
19811984

19821985
void DeleteReadIterator(TReadIteratorsMap::iterator it);
19831986
void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx);
@@ -2709,9 +2712,11 @@ class TDataShard
27092712
TInstant EnqueuedAt;
27102713
ui64 LockId;
27112714
ui64 LockOffset;
2715+
ui64 ReservationCookie;
27122716

27132717
explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId,
2714-
ui64 schemaVersion, TInstant created, TInstant enqueued, ui64 lockId = 0, ui64 lockOffset = 0)
2718+
ui64 schemaVersion, TInstant created, TInstant enqueued,
2719+
ui64 lockId = 0, ui64 lockOffset = 0, ui64 cookie = 0)
27152720
: BodySize(bodySize)
27162721
, TableId(tableId)
27172722
, SchemaVersion(schemaVersion)
@@ -2720,12 +2725,13 @@ class TDataShard
27202725
, EnqueuedAt(enqueued)
27212726
, LockId(lockId)
27222727
, LockOffset(lockOffset)
2728+
, ReservationCookie(cookie)
27232729
{
27242730
}
27252731

2726-
explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now)
2732+
explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now, ui64 cookie)
27272733
: TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now,
2728-
record.LockId, record.LockOffset)
2734+
record.LockId, record.LockOffset, cookie)
27292735
{
27302736
}
27312737
};
@@ -2745,6 +2751,9 @@ class TDataShard
27452751
THashMap<ui64, TEnqueuedRecord> ChangesQueue; // ui64 is order
27462752
TIntrusiveList<TEnqueuedRecord, TEnqueuedRecordTag> ChangesList;
27472753
ui64 ChangesQueueBytes = 0;
2754+
THashMap<ui64, ui32> ChangeQueueReservations;
2755+
ui64 NextChangeQueueReservationCookie = 1;
2756+
ui32 ChangeQueueReservedCapacity = 0;
27482757
TActorId OutChangeSender;
27492758
bool OutChangeSenderSuspended = false;
27502759

0 commit comments

Comments
 (0)