From cd989fa5d1f0e06270210f75fbf20e4b7ad873d4 Mon Sep 17 00:00:00 2001 From: Alekseii Nikolaevskii Date: Wed, 10 Apr 2024 16:20:09 +0300 Subject: [PATCH 1/3] do not check explicit sources every cleanup --- ydb/core/persqueue/sourceid.cpp | 20 +++++++++++--------- ydb/core/persqueue/sourceid.h | 2 +- ydb/core/persqueue/ut/sourceid_ut.cpp | 20 ++++++++++++++++++++ 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/ydb/core/persqueue/sourceid.cpp b/ydb/core/persqueue/sourceid.cpp index e2176d8c3376..07a0d7895478 100644 --- a/ydb/core/persqueue/sourceid.cpp +++ b/ydb/core/persqueue/sourceid.cpp @@ -253,7 +253,7 @@ void TSourceIdStorage::DeregisterSourceId(const TString& sourceId) { ExplicitSourceIds.erase(sourceId); } - SourceIdsByOffset.erase(std::make_pair(it->second.Offset, sourceId)); + SourceIdsByOffset[it->second.Explicit].erase(std::make_pair(it->second.Offset, sourceId)); InMemorySourceIds.erase(it); auto jt = SourceIdOwners.find(sourceId); @@ -277,7 +277,7 @@ bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInsta const auto ttl = TDuration::Seconds(config.GetSourceIdLifetimeSeconds()); ui32 size = request->Record.ByteSize(); - for (const auto& [offset, sourceId] : SourceIdsByOffset) { + for (const auto& [offset, sourceId] : SourceIdsByOffset[0]) { if (offset >= startOffset && toDelOffsets.size() >= maxDeleteSourceIds) { break; } @@ -323,7 +323,7 @@ bool TSourceIdStorage::DropOldSourceIds(TEvKeyValue::TEvRequest* request, TInsta size_t res = InMemorySourceIds.erase(t.second); Y_ABORT_UNLESS(res == 1); // delete sourceID from offsets - res = SourceIdsByOffset.erase(t); + res = SourceIdsByOffset[0].erase(t); Y_ABORT_UNLESS(res == 1); // save owners to drop and delete records from map auto it = SourceIdOwners.find(t.second); @@ -372,14 +372,14 @@ void TSourceIdStorage::RegisterSourceIdInfo(const TString& sourceId, TSourceIdIn auto it = InMemorySourceIds.find(sourceId); if (it != InMemorySourceIds.end()) { if (!load || it->second.Offset < sourceIdInfo.Offset) { - const auto res = SourceIdsByOffset.erase(std::make_pair(it->second.Offset, sourceId)); + const auto res = SourceIdsByOffset[sourceIdInfo.Explicit].erase(std::make_pair(it->second.Offset, sourceId)); Y_ABORT_UNLESS(res == 1); } else { return; } } - const bool res = SourceIdsByOffset.emplace(sourceIdInfo.Offset, sourceId).second; + const bool res = SourceIdsByOffset[sourceIdInfo.Explicit].emplace(sourceIdInfo.Offset, sourceId).second; Y_ABORT_UNLESS(res); if (sourceIdInfo.Explicit) { @@ -421,10 +421,12 @@ void TSourceIdStorage::MarkOwnersForDeletedSourceId(THashMapsecond); - Y_ABORT_UNLESS(it != InMemorySourceIds.end()); - ds = Min(ds, it->second.WriteTimestamp); + for (ui32 i = 0 ; i < 2; ++i) { + if (!SourceIdsByOffset[i].empty()) { + auto it = InMemorySourceIds.find(SourceIdsByOffset[i].begin()->second); + Y_ABORT_UNLESS(it != InMemorySourceIds.end()); + ds = Min(ds, it->second.WriteTimestamp); + } } return ds; diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 897014363f29..19d9b908f6db 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -108,7 +108,7 @@ class TSourceIdStorage: private THeartbeatProcessor { TSourceIdMap InMemorySourceIds; THashMap SourceIdOwners; TVector OwnersToDrop; - TSet> SourceIdsByOffset; + TSet> SourceIdsByOffset[2]; // used to track heartbeats THashSet ExplicitSourceIds; diff --git a/ydb/core/persqueue/ut/sourceid_ut.cpp b/ydb/core/persqueue/ut/sourceid_ut.cpp index 43164c888c50..31853ccc9f4c 100644 --- a/ydb/core/persqueue/ut/sourceid_ut.cpp +++ b/ydb/core/persqueue/ut/sourceid_ut.cpp @@ -460,6 +460,26 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) { } } + Y_UNIT_TEST(ExpensiveCleanup) { + TSourceIdStorage storage; + ui64 offset = 0; + + // initial info w/o heartbeats + for (ui32 i = 1; i <= 100000; ++i) { + storage.RegisterSourceId(TestSourceId(i), MakeExplicitSourceIdInfo(++offset)); + } + + NKikimrPQ::TPartitionConfig config; + config.SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds()); + + auto request = MakeHolder(); + for (ui32 i = 0; i < 1000; ++i) { + Cerr << "Iteration " << i << "\n"; + const auto dropped = storage.DropOldSourceIds(request.Get(), TInstant::Hours(2), 1'000'000, TPartitionId(TestPartition), config); + UNIT_ASSERT_EQUAL(dropped, false); + } + + } } // TSourceIdTests } // namespace NKikimr::NPQ From 329f48dd71684623adc39a52adb29fc68898ffe8 Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Fri, 19 Apr 2024 19:32:32 +0300 Subject: [PATCH 2/3] fix failing on verify in sqs when throttling budget is over and action does not require existing queue (#3938) --- ydb/core/ymq/actor/action.h | 4 +-- ydb/core/ymq/actor/service.cpp | 1 + .../test_throttling_nonexistent_queue.py | 27 +++++++++++++++++-- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/ydb/core/ymq/actor/action.h b/ydb/core/ymq/actor/action.h index 8ea1cb814c64..a594f5eafee4 100644 --- a/ydb/core/ymq/actor/action.h +++ b/ydb/core/ymq/actor/action.h @@ -649,6 +649,8 @@ class TActionActor } } + Y_ABORT_UNLESS(SchemeCache_); + bool isACLProtectedAccount = Cfg().GetForceAccessControl(); if (!IsCloud() && (SecurityToken_ || (Cfg().GetForceAccessControl() && (isACLProtectedAccount = IsACLProtectedAccount(UserName_))))) { this->Become(&TActionActor::WaitAuthCheckMessages); @@ -666,8 +668,6 @@ class TActionActor return; } - Y_ABORT_UNLESS(SchemeCache_); - RequestSchemeCache(GetActionACLSourcePath()); // this also checks that requested queue (if any) does exist RequestTicketParser(); } else { diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp index 04498866da82..9181d88d9b64 100644 --- a/ydb/core/ymq/actor/service.cpp +++ b/ydb/core/ymq/actor/service.cpp @@ -685,6 +685,7 @@ void TSqsService::AnswerThrottled(TSqsEvents::TEvGetConfiguration::TPtr& ev) { RLOG_SQS_REQ_DEBUG(ev->Get()->RequestId, "Throttled because of too many requests for nonexistent queue [" << ev->Get()->QueueName << "] for user [" << ev->Get()->UserName << "] while getting configuration"); auto answer = MakeHolder(); answer->Throttled = true; + answer->SchemeCache = SchemeCache_; Send(ev->Sender, answer.Release()); } diff --git a/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py b/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py index 2ed57a9ac3af..05d4abc70c40 100644 --- a/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py +++ b/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py @@ -5,8 +5,11 @@ from ydb.tests.library.sqs.test_base import KikimrSqsTestBase +throttling_exception_pattern = ".*ThrottlingException.*" + class TestSqsThrottlingOnNonexistentQueue(KikimrSqsTestBase): + def test_throttling_on_nonexistent_queue(self): queue_url = self._create_queue_and_assert(self.queue_name, False, True) nonexistent_queue_url = queue_url + "_nonex" @@ -21,8 +24,6 @@ def get_attributes_of_nonexistent_queue(): except Exception: pass - throttling_exception_pattern = ".*ThrottlingException.*" - assert_that( get_attributes_of_nonexistent_queue, raises( @@ -46,3 +47,25 @@ def get_attributes_of_nonexistent_queue(): pattern=throttling_exception_pattern ) ) + + def test_action_which_does_not_requere_existing_queue(self): + queue_url = self._create_queue_and_assert(self.queue_name, False, True) + nonexistent_queue_url = queue_url + "_nonex" + + def get_attributes_of_nonexistent_queue(): + self._sqs_api.get_queue_attributes(nonexistent_queue_url) + + # Draining budget + for _ in range(16): + try: + get_attributes_of_nonexistent_queue() + except Exception: + pass + + assert_that( + lambda: self._sqs_api.get_queue_url(self.queue_name + "_nonex"), + raises( + RuntimeError, + pattern=throttling_exception_pattern + ) + ) From 1021acfa87d727d2d53b48dc1da33052d2366aee Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Fri, 19 Apr 2024 19:48:33 +0300 Subject: [PATCH 3/3] fix python linter in YMQ tests (#3944) --- .../functional/sqs/common/test_throttling_nonexistent_queue.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py b/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py index 05d4abc70c40..789d653e859b 100644 --- a/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py +++ b/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py @@ -4,11 +4,10 @@ from ydb.tests.library.sqs.test_base import KikimrSqsTestBase - throttling_exception_pattern = ".*ThrottlingException.*" -class TestSqsThrottlingOnNonexistentQueue(KikimrSqsTestBase): +class TestSqsThrottlingOnNonexistentQueue(KikimrSqsTestBase): def test_throttling_on_nonexistent_queue(self): queue_url = self._create_queue_and_assert(self.queue_name, False, True)