Skip to content

Commit 9795803

Browse files
SQS-785: correct response to GetQueueUrl with custom queue name when throttling (#5439)
1 parent ac40d91 commit 9795803

File tree

6 files changed

+71
-27
lines changed

6 files changed

+71
-27
lines changed

ydb/core/ymq/actor/action.h

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,14 @@ class TActionActor
7777
if (TProxyActor::NeedCreateProxyActor(Action_)) {
7878
configurationFlags |= TSqsEvents::TEvGetConfiguration::EFlags::NeedQueueLeader;
7979
}
80+
bool enableThrottling = (Action_ != EAction::CreateQueue);
8081
this->Send(MakeSqsServiceID(this->SelfId().NodeId()),
8182
MakeHolder<TSqsEvents::TEvGetConfiguration>(
8283
RequestId_,
8384
UserName_,
8485
GetQueueName(),
86+
FolderId_,
87+
enableThrottling,
8588
configurationFlags)
8689
);
8790
}
@@ -637,17 +640,16 @@ class TActionActor
637640
return;
638641
}
639642

640-
if (TDerived::NeedExistingQueue()) {
641-
if (ev->Get()->Throttled) {
642-
MakeError(MutableErrorDesc(), NErrors::THROTTLING_EXCEPTION, "Too many requests for nonexistent queue.");
643-
SendReplyAndDie();
644-
return;
645-
}
646-
if (!ev->Get()->QueueExists) {
647-
MakeError(MutableErrorDesc(), NErrors::NON_EXISTENT_QUEUE);
648-
SendReplyAndDie();
649-
return;
650-
}
643+
if (ev->Get()->Throttled) {
644+
MakeError(MutableErrorDesc(), NErrors::THROTTLING_EXCEPTION, "Too many requests for nonexistent queue.");
645+
SendReplyAndDie();
646+
return;
647+
}
648+
649+
if (TDerived::NeedExistingQueue() && !ev->Get()->QueueExists) {
650+
MakeError(MutableErrorDesc(), NErrors::NON_EXISTENT_QUEUE);
651+
SendReplyAndDie();
652+
return;
651653
}
652654

653655
Y_ABORT_UNLESS(SchemeCache_);

ydb/core/ymq/actor/events.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ struct TSqsEvents {
162162
TString RequestId;
163163
TString UserName;
164164
TString QueueName;
165+
TString FolderId;
166+
bool EnableThrottling = true;
165167
ui64 Flags = 0;
166168

167169
enum EFlags {
@@ -177,6 +179,20 @@ struct TSqsEvents {
177179
, QueueName(name)
178180
, Flags(flags)
179181
{ }
182+
TEvGetConfiguration(
183+
TString requestId,
184+
const TString& user,
185+
const TString& name,
186+
const TString& folderId,
187+
bool enableThrottling,
188+
ui64 flags = 0
189+
) : RequestId(std::move(requestId))
190+
, UserName(user)
191+
, QueueName(name)
192+
, FolderId(folderId)
193+
, EnableThrottling(enableThrottling)
194+
, Flags(flags)
195+
{ }
180196
};
181197

182198
struct TQuoterResourcesForActions : public TAtomicRefCount<TQuoterResourcesForActions> {

ydb/core/ymq/actor/service.cpp

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -579,18 +579,28 @@ void TSqsService::HandleGetConfiguration(TSqsEvents::TEvGetConfiguration::TPtr&
579579
}
580580

581581
const auto queueIt = user->Queues_.find(queueName);
582-
if (queueIt == user->Queues_.end()) {
583-
if (RequestQueueListForUser(user, reqId)) {
584-
LWPROBE(QueueRequestCacheMiss, userName, queueName, reqId, ev->Get()->ToStringHeader());
585-
RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list");
586-
user->GetConfigurationRequests_.emplace(queueName, std::move(ev));
587-
} else {
588-
AnswerThrottled(ev);
589-
}
582+
if (queueIt != user->Queues_.end()) {
583+
ProcessConfigurationRequestForQueue(ev, user, queueIt->second);
590584
return;
585+
} else if (ev->Get()->FolderId) {
586+
const auto byNameAndFolderIt = user->QueueByNameAndFolder_ .find(
587+
std::make_pair(ev->Get()->QueueName, ev->Get()->FolderId)
588+
);
589+
if (byNameAndFolderIt != user->QueueByNameAndFolder_.end()) {
590+
ProcessConfigurationRequestForQueue(ev, user, byNameAndFolderIt->second);
591+
return;
592+
}
591593
}
592594

593-
ProcessConfigurationRequestForQueue(ev, user, queueIt->second);
595+
if (RequestQueueListForUser(user, reqId)) {
596+
LWPROBE(QueueRequestCacheMiss, userName, queueName, reqId, ev->Get()->ToStringHeader());
597+
RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list");
598+
user->GetConfigurationRequests_.emplace(queueName, std::move(ev));
599+
} else if (ev->Get()->EnableThrottling) {
600+
AnswerThrottled(ev);
601+
} else {
602+
AnswerNotExists(ev, user);
603+
}
594604
}
595605

596606
void TSqsService::AnswerNotExists(TSqsEvents::TEvGetConfiguration::TPtr& ev, const TUserInfoPtr& userInfo) {

ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -880,8 +880,5 @@ def get_attributes_of_nonexistent_queue():
880880
)
881881
)
882882

883-
# Check that getting queue url with custom name still works
884-
assert_that(
885-
lambda: self._sqs_api.get_queue_url(custom_queue_name),
886-
not_(raises(RuntimeError))
887-
)
883+
received_queue_url = self._sqs_api.get_queue_url(custom_queue_name)
884+
assert received_queue_url == queue_url

ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py renamed to ydb/tests/functional/sqs/common/test_throttling.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
3-
from hamcrest import assert_that, raises
3+
from hamcrest import assert_that, raises, not_
44

55
from ydb.tests.library.sqs.test_base import KikimrSqsTestBase
66

@@ -68,3 +68,22 @@ def get_attributes_of_nonexistent_queue():
6868
pattern=throttling_exception_pattern
6969
)
7070
)
71+
72+
def test_that_queue_can_be_created_despite_lack_of_throttling_budget(self):
73+
queue_url = self._create_queue_and_assert(self.queue_name, False, True)
74+
nonexistent_queue_url = queue_url + "_nonex"
75+
76+
def get_attributes_of_nonexistent_queue():
77+
self._sqs_api.get_queue_attributes(nonexistent_queue_url)
78+
79+
# Draining budget
80+
for _ in range(16):
81+
try:
82+
get_attributes_of_nonexistent_queue()
83+
except Exception:
84+
pass
85+
86+
assert_that(
87+
lambda: self._create_queue_and_assert("other_queue_name", False, True),
88+
not_(raises(RuntimeError))
89+
)

ydb/tests/functional/sqs/common/ya.make

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ TEST_SRCS(
1313
test_queue_attributes_validation.py
1414
test_queues_managing.py
1515
test_format_without_version.py
16-
test_throttling_nonexistent_queue.py
16+
test_throttling.py
1717
test_queue_counters.py
1818
)
1919

0 commit comments

Comments
 (0)