diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 5269e444b048..baa3295190dd 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -27,6 +27,7 @@ namespace NKikimr::NPQ { static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB; static const ui32 MAX_INLINE_SIZE = 1000; +static const TDuration SubDomainQuotaWaitDurationMs = TDuration::Seconds(60); static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_INACTIVE; @@ -1483,8 +1484,12 @@ void TPartition::AddNewWriteBlob(std::pair& res, TEvKeyValue::TEvReq void TPartition::SetDeadlinesForWrites(const TActorContext& ctx) { PQ_LOG_T("TPartition::SetDeadlinesForWrites."); - if (AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs() > 0 && QuotaDeadline == TInstant::Zero()) { - QuotaDeadline = ctx.Now() + TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs()); + auto quotaWaitDurationMs = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetQuotingConfig().GetQuotaWaitDurationMs()); + if (SubDomainOutOfSpace) { + quotaWaitDurationMs = quotaWaitDurationMs ? std::min(quotaWaitDurationMs, SubDomainQuotaWaitDurationMs) : SubDomainQuotaWaitDurationMs; + } + if (quotaWaitDurationMs > TDuration::Zero() && QuotaDeadline == TInstant::Zero()) { + QuotaDeadline = ctx.Now() + quotaWaitDurationMs; ctx.Schedule(QuotaDeadline, new TEvPQ::TEvQuotaDeadlineCheck()); } @@ -1513,7 +1518,7 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue& { TMessageQueue newRequests; for (auto& w : requests) { - if (!w.IsWrite() || w.GetWrite().Msg.IgnoreQuotaDeadline) { + if (!w.IsWrite() || (w.GetWrite().Msg.IgnoreQuotaDeadline && !SubDomainOutOfSpace)) { newRequests.emplace_back(std::move(w)); continue; } @@ -1529,7 +1534,8 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue& WriteInflightSize -= msg.Data.size(); } - ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, "quota exceeded"); + TString errorMsg = SubDomainOutOfSpace ? "database size exceeded" : "quota exceeded"; + ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::OVERLOAD, errorMsg); } requests = std::move(newRequests); } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index ea678be2529a..1ff74f56219b 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -287,6 +287,7 @@ class TPartitionFixture : public NUnitTest::TBaseFixture { void ShadowPartitionCountersTest(bool isFirstClass); void TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration, bool ignoreQuotaDeadline); + void TestWriteSubDomainOutOfSpace_DeadlineWork(bool ignoreQuotaDeadline); void WaitKeyValueRequest(TMaybe& cookie); void CmdChangeOwner(ui64 cookie, const TString& sourceId, TDuration duration, TString& ownerCookie); @@ -1206,6 +1207,62 @@ void TPartitionFixture::EmulateKVTablet() Cerr << "Send disk status response with cookie: " << cookie.GetOrElse(0) << Endl; } +void TPartitionFixture::TestWriteSubDomainOutOfSpace_DeadlineWork(bool ignoreQuotaDeadline) +{ + Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); + Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300); + CreatePartition({ + .Partition=TPartitionId{1}, + .Begin=0, .End=0, + // + // partition configuration + // + .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}} + }, + // + // tablet configuration + // + {.Version=2, .Consumers={{.Consumer="client-1"}}}); + TMaybe kvCookie; + + SendSubDomainStatus(true); + + ui64 cookie = 1; + ui64 messageNo = 0; + TString ownerCookie; + + CmdChangeOwner(cookie, "owner1", TDuration::Seconds(1), ownerCookie); + + TAutoPtr handle; + std::function truth = [&](const TEvPQ::TEvError& e) { + return cookie == e.Cookie; + }; + + TString data = "data for write"; + + // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded. + SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, ignoreQuotaDeadline); + messageNo++; + + WaitKeyValueRequest(kvCookie); // the partition saves the TEvPQ::TEvWrite event + SendDiskStatusResponse(&kvCookie); + + { + auto event = Ctx->Runtime->GrabEdgeEvent(TDuration::Seconds(1)); + UNIT_ASSERT(event != nullptr); + } + + // Second message will not be processed because the limit is exceeded. + SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, ignoreQuotaDeadline); + messageNo++; + + { + auto event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(event != nullptr); + UNIT_ASSERT_EQUAL(NPersQueue::NErrorCode::OVERLOAD, event->ErrorCode); + } +} + void TPartitionFixture::TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration, bool ignoreQuotaDeadline) { Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); @@ -2323,58 +2380,7 @@ Y_UNIT_TEST_F(ReserveSubDomainOutOfSpace, TPartitionFixture) Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture) { - Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); - Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300); - CreatePartition({ - .Partition=TPartitionId{1}, - .Begin=0, .End=0, - // - // partition configuration - // - .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}} - }, - // - // tablet configuration - // - {.Version=2, .Consumers={{.Consumer="client-1"}}}); - TMaybe kvCookie; - - SendSubDomainStatus(true); - - ui64 cookie = 1; - ui64 messageNo = 0; - TString ownerCookie; - - CmdChangeOwner(cookie, "owner1", TDuration::Seconds(1), ownerCookie); - - TAutoPtr handle; - std::function truth = [&](const TEvPQ::TEvError& e) { - return cookie == e.Cookie; - }; - - TString data = "data for write"; - - // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded. - SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data); - messageNo++; - - WaitKeyValueRequest(kvCookie); // the partition saves the TEvPQ::TEvWrite event - SendDiskStatusResponse(&kvCookie); - - { - auto event = Ctx->Runtime->GrabEdgeEvent(TDuration::Seconds(1)); - UNIT_ASSERT(event != nullptr); - } - - // Second message will not be processed because the limit is exceeded. - SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data); - messageNo++; - - { - auto event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); - UNIT_ASSERT(event != nullptr); - UNIT_ASSERT_EQUAL(NPersQueue::NErrorCode::OVERLOAD, event->ErrorCode); - } + TestWriteSubDomainOutOfSpace_DeadlineWork(false); } Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_DisableExpiration, TPartitionFixture) @@ -2384,7 +2390,7 @@ Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_DisableExpiration, TPartitionFixture) Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_IgnoreQuotaDeadline, TPartitionFixture) { - TestWriteSubDomainOutOfSpace(TDuration::MilliSeconds(300), true); + TestWriteSubDomainOutOfSpace_DeadlineWork(true); } Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) {