Skip to content

Commit e0d3041

Browse files
authored
Merge dd6d2f3 into 8b8059d
2 parents 8b8059d + dd6d2f3 commit e0d3041

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

ydb/core/persqueue/partition_write.cpp

+14-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ namespace NKikimr::NPQ {
2727

2828
static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB;
2929
static const ui32 MAX_INLINE_SIZE = 1000;
30+
static const TDuration SubDomainQuotaWaitDurationMs = TDuration::Seconds(60);
3031

3132
static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_INACTIVE;
3233

@@ -1488,6 +1489,16 @@ void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) {
14881489

14891490
ctx.Schedule(QuotaDeadline, new TEvPQ::TEvQuotaDeadlineCheck());
14901491
}
1492+
1493+
auto quotaWaitDurationMs = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs());
1494+
if (SubDomainOutOfSpace) {
1495+
quotaWaitDurationMs = quotaWaitDurationMs ? std::min(quotaWaitDurationMs, SubDomainQuotaWaitDurationMs) : SubDomainQuotaWaitDurationMs;
1496+
}
1497+
if (quotaWaitDurationMs > TDuration::Zero() && QuotaDeadline == TInstant::Zero()) {
1498+
QuotaDeadline = ctx.Now() + quotaWaitDurationMs;
1499+
1500+
ctx.Schedule(QuotaDeadline, new TEvPQ::TEvQuotaDeadlineCheck());
1501+
}
14911502
}
14921503

14931504
void TPartition::Handle(TEvPQ::TEvQuotaDeadlineCheck::TPtr&, const TActorContext& ctx) {
@@ -1513,7 +1524,7 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue&
15131524
{
15141525
TMessageQueue newRequests;
15151526
for (auto& w : requests) {
1516-
if (!w.IsWrite() || w.GetWrite().Msg.IgnoreQuotaDeadline) {
1527+
if (!w.IsWrite() || (w.GetWrite().Msg.IgnoreQuotaDeadline && !SubDomainOutOfSpace)) {
15171528
newRequests.emplace_back(std::move(w));
15181529
continue;
15191530
}
@@ -1529,7 +1540,8 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue&
15291540
WriteInflightSize -= msg.Data.size();
15301541
}
15311542

1532-
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded");
1543+
TString errorMsg = SubDomainOutOfSpace ? "database size exceeded" : "quota exceeded";
1544+
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, errorMsg);
15331545
}
15341546
requests = std::move(newRequests);
15351547
}

0 commit comments

Comments
 (0)