Skip to content

Commit adc27fd

Browse files
authored
YMQ fixes for 24-3 (#9646)
1 parent 66cf1fb commit adc27fd

File tree

4 files changed

+125
-98
lines changed

4 files changed

+125
-98
lines changed

ydb/core/http_proxy/http_req.cpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ namespace NKikimr::NHttpProxy {
553553
.Counters = nullptr,
554554
.AWSSignature = std::move(HttpContext.GetSignature()),
555555
.IAMToken = HttpContext.IamToken,
556-
.FolderID = ""
556+
.FolderID = HttpContext.FolderId
557557
};
558558

559559
auto authRequestProxy = MakeHolder<NSQS::THttpProxyAuthRequestProxy>(
@@ -1148,10 +1148,15 @@ namespace NKikimr::NHttpProxy {
11481148
SourceAddress = address;
11491149
}
11501150

1151-
DatabasePath = Request->URL;
1151+
DatabasePath = Request->URL.Before('?');
11521152
if (DatabasePath == "/") {
11531153
DatabasePath = "";
11541154
}
1155+
auto params = TCgiParameters(Request->URL.After('?'));
1156+
if (auto it = params.Find("folderId"); it != params.end()) {
1157+
FolderId = it->second;
1158+
}
1159+
11551160
//TODO: find out databaseId
11561161
ParseHeaders(Request->Headers);
11571162
}

ydb/core/http_proxy/ut/http_proxy_ut.h

+69-13
Original file line numberDiff line numberDiff line change
@@ -1625,6 +1625,20 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
16251625
UNIT_ASSERT_VALUES_EQUAL(resultMessage, "The specified queue doesn't exist.");
16261626
}
16271627

1628+
Y_UNIT_TEST_F(TestGetQueueUrlWithIAM, THttpProxyTestMock) {
1629+
auto req = CreateSqsGetQueueUrlRequest();
1630+
req["QueueName"] = "not-existing-queue";
1631+
auto res = SendHttpRequest("/Root?folderId=XXX", "AmazonSQS.GetQueueUrl", std::move(req), "X-YaCloud-SubjectToken: Bearer proxy_sa@builtin");
1632+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400);
1633+
1634+
NJson::TJsonValue json;
1635+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1636+
TString resultType = GetByPath<TString>(json, "__type");
1637+
UNIT_ASSERT_VALUES_EQUAL(resultType, "AWS.SimpleQueueService.NonExistentQueue");
1638+
TString resultMessage = GetByPath<TString>(json, "message");
1639+
UNIT_ASSERT_VALUES_EQUAL(resultMessage, "The specified queue doesn't exist.");
1640+
}
1641+
16281642
Y_UNIT_TEST_F(TestSendMessage, THttpProxyTestMock) {
16291643
auto createQueueReq = CreateSqsCreateQueueRequest();
16301644
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
@@ -1645,7 +1659,7 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
16451659
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
16461660
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
16471661
UNIT_ASSERT(!GetByPath<TString>(json, "SequenceNumber").empty());
1648-
UNIT_ASSERT(!GetByPath<TString>(json, "Md5OfMessageBody").empty());
1662+
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
16491663
UNIT_ASSERT(!GetByPath<TString>(json, "MessageId").empty());
16501664
}
16511665

@@ -1666,7 +1680,7 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
16661680

16671681
res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
16681682
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1669-
UNIT_ASSERT(!GetByPath<TString>(json, "Md5OfMessageBody").empty());
1683+
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
16701684
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
16711685

16721686
for (int i = 0; i < 20; ++i) {
@@ -1698,16 +1712,58 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
16981712
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
16991713
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));
17001714

1701-
NJson::TJsonValue getQueueAttributes;
1702-
getQueueAttributes["QueueUrl"] = resultQueueUrl;
1703-
NJson::TJsonArray attributeNames = {"DelaySeconds"};
1704-
getQueueAttributes["AttributeNames"] = attributeNames;
1715+
{
1716+
NJson::TJsonValue getQueueAttributes;
1717+
getQueueAttributes["QueueUrl"] = resultQueueUrl;
1718+
NJson::TJsonArray attributeNames = {"DelaySeconds"};
1719+
getQueueAttributes["AttributeNames"] = attributeNames;
17051720

1706-
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
1707-
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1708-
NJson::TJsonValue resultJson;
1709-
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
1710-
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
1721+
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
1722+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1723+
NJson::TJsonValue resultJson;
1724+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
1725+
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
1726+
}
1727+
1728+
{
1729+
NJson::TJsonValue getQueueAttributes;
1730+
getQueueAttributes["QueueUrl"] = resultQueueUrl;
1731+
NJson::TJsonArray attributeNames = {
1732+
"ApproximateNumberOfMessages",
1733+
"ApproximateNumberOfMessagesDelayed",
1734+
"ApproximateNumberOfMessagesNotVisible",
1735+
"CreatedTimestamp",
1736+
"DelaySeconds",
1737+
"MaximumMessageSize",
1738+
"MessageRetentionPeriod",
1739+
"ReceiveMessageWaitTimeSeconds",
1740+
"RedrivePolicy",
1741+
"VisibilityTimeout",
1742+
"FifoQueue",
1743+
"ContentBasedDeduplication",
1744+
"QueueArn"
1745+
};
1746+
getQueueAttributes["AttributeNames"] = attributeNames;
1747+
1748+
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
1749+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1750+
NJson::TJsonValue resultJson;
1751+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
1752+
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
1753+
}
1754+
1755+
{
1756+
NJson::TJsonValue getQueueAttributes;
1757+
getQueueAttributes["QueueUrl"] = resultQueueUrl;
1758+
NJson::TJsonArray attributeNames = {"All"};
1759+
getQueueAttributes["AttributeNames"] = attributeNames;
1760+
1761+
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
1762+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1763+
NJson::TJsonValue resultJson;
1764+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
1765+
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
1766+
}
17111767
}
17121768

17131769
Y_UNIT_TEST_F(TestListQueues, THttpProxyTestMock) {
@@ -1911,8 +1967,8 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
19111967
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
19121968
auto succesful0 = json["Successful"][0];
19131969
UNIT_ASSERT(succesful0["Id"] == "Id-0");
1914-
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageAttributes").empty());
1915-
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageBody").empty());
1970+
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MD5OfMessageAttributes").empty());
1971+
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MD5OfMessageBody").empty());
19161972
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MessageId").empty());
19171973
}
19181974

ydb/public/api/protos/draft/ymq.proto

+9-9
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ message GetQueueAttributesResult {
128128
message GetQueueUrlRequest {
129129
Ydb.Operations.OperationParams operation_params = 1;
130130
string queue_name = 2;
131-
optional string queue_owner_aws_account_id = 3;
131+
optional string queue_owner_a_w_s_account_id = 3;
132132
}
133133

134134
message GetQueueUrlResponse {
@@ -194,8 +194,8 @@ message ReceiveMessageResponse {
194194
message Message {
195195
map<string, string> attributes = 1;
196196
string body = 2;
197-
string md5_of_body = 3;
198-
string md5_of_message_attributes = 4;
197+
string m_d_5_of_body = 3;
198+
string m_d_5_of_message_attributes = 4;
199199
map<string, MessageAttribute> message_attributes = 5;
200200
string message_id = 6;
201201
string receipt_handle = 7;
@@ -221,9 +221,9 @@ message SendMessageResponse {
221221
}
222222

223223
message SendMessageResult {
224-
string md5_of_message_attributes = 1;
225-
string md5_of_message_body= 2;
226-
string md5_of_message_system_attributes= 3;
224+
string m_d_5_of_message_attributes = 1;
225+
string m_d_5_of_message_body= 2;
226+
string m_d_5_of_message_system_attributes= 3;
227227
string message_id = 4;
228228
string sequence_number = 5;
229229
}
@@ -248,10 +248,10 @@ message SendMessageBatchRequestEntry {
248248

249249
message SendMessageBatchResultEntry {
250250
string id = 1;
251-
string md5_of_message_body = 2;
251+
string m_d_5_of_message_body = 2;
252252
string message_id = 3;
253-
string md5_of_message_attributes = 4;
254-
string md5_of_message_system_attributes = 5;
253+
string m_d_5_of_message_attributes = 4;
254+
string m_d_5_of_message_system_attributes = 5;
255255
string sequence_number = 6;
256256
}
257257

ydb/services/ymq/ymq_proxy.cpp

+40-74
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,8 @@ namespace NKikimr::NYmq::V1 {
259259

260260
Ydb::Ymq::V1::SendMessageResult GetResult(const NKikimrClient::TSqsResponse& resp) override {
261261
Ydb::Ymq::V1::SendMessageResult result;
262-
result.set_md5_of_message_attributes(GetResponse(resp).GetMD5OfMessageAttributes());
263-
result.set_md5_of_message_body(GetResponse(resp).GetMD5OfMessageBody());
262+
result.set_m_d_5_of_message_attributes(GetResponse(resp).GetMD5OfMessageAttributes());
263+
result.set_m_d_5_of_message_body(GetResponse(resp).GetMD5OfMessageBody());
264264
result.set_message_id(GetResponse(resp).GetMessageId());
265265
result.set_sequence_number(std::to_string(GetResponse(resp).GetSequenceNumber()));
266266
return result;
@@ -347,8 +347,8 @@ namespace NKikimr::NYmq::V1 {
347347
}
348348

349349
dstMessage.set_body(srcMessage.GetData());
350-
dstMessage.set_md5_of_body(srcMessage.GetMD5OfMessageBody());
351-
dstMessage.set_md5_of_message_attributes(srcMessage.GetMD5OfMessageAttributes());
350+
dstMessage.set_m_d_5_of_body(srcMessage.GetMD5OfMessageBody());
351+
dstMessage.set_m_d_5_of_message_attributes(srcMessage.GetMD5OfMessageAttributes());
352352

353353
for (const auto& srcAttribute: srcMessage.GetMessageAttributes()) {
354354
Ydb::Ymq::V1::MessageAttribute dstAttribute;
@@ -448,75 +448,41 @@ namespace NKikimr::NYmq::V1 {
448448

449449
Ydb::Ymq::V1::GetQueueAttributesResult GetResult(const NKikimrClient::TSqsResponse& resp) override {
450450
Ydb::Ymq::V1::GetQueueAttributesResult result;
451-
for (const auto& attributeName : Attributes) {
452-
if (attributeName == APPROXIMATE_NUMBER_OF_MESSAGES) {
453-
AddAttribute(
454-
result,
455-
APPROXIMATE_NUMBER_OF_MESSAGES,
456-
GetResponse(resp).GetApproximateNumberOfMessages()
457-
);
458-
} else if (attributeName == APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED) {
459-
AddAttribute(
460-
result,
461-
APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED,
462-
GetResponse(resp).GetApproximateNumberOfMessagesDelayed()
463-
);
464-
} else if (attributeName == CREATED_TIMESTAMP) {
465-
AddAttribute(
466-
result,
467-
CREATED_TIMESTAMP,
468-
GetResponse(resp).GetCreatedTimestamp()
469-
);
470-
} else if (attributeName == DELAY_SECONDS) {
471-
AddAttribute(
472-
result,
473-
DELAY_SECONDS,
474-
GetResponse(resp).GetDelaySeconds()
475-
);
476-
} else if (attributeName == LAST_MODIFIED_TIMESTAMP) {
477-
AddAttribute(
478-
result,
479-
LAST_MODIFIED_TIMESTAMP,
480-
GetResponse(resp).GetLastModifiedTimestamp()
481-
);
482-
} else if (attributeName == MAXIMUM_MESSAGE_SIZE) {
483-
AddAttribute(
484-
result,
485-
MAXIMUM_MESSAGE_SIZE,
486-
GetResponse(resp).GetMaximumMessageSize()
487-
);
488-
} else if (attributeName == MESSAGE_RETENTION_PERIOD) {
489-
AddAttribute(
490-
result,
491-
MESSAGE_RETENTION_PERIOD,
492-
GetResponse(resp).GetMessageRetentionPeriod()
493-
);
494-
} else if (attributeName == QUEUE_ARN) {
495-
AddAttribute(
496-
result,
497-
QUEUE_ARN,
498-
GetResponse(resp).GetQueueArn()
499-
);
500-
} else if (attributeName == RECEIVE_MESSAGE_WAIT_TIME_SECONDS) {
501-
AddAttribute(
502-
result,
503-
RECEIVE_MESSAGE_WAIT_TIME_SECONDS,
504-
GetResponse(resp).GetReceiveMessageWaitTimeSeconds()
505-
);
506-
} else if (attributeName == VISIBILITY_TIMEOUT) {
507-
AddAttribute(
508-
result,
509-
VISIBILITY_TIMEOUT,
510-
GetResponse(resp).GetVisibilityTimeout()
511-
);
512-
} else if (attributeName == REDRIVE_POLICY) {
513-
AddAttribute(
514-
result,
515-
REDRIVE_POLICY,
516-
GetResponse(resp).GetRedrivePolicy()
517-
);
518-
}
451+
const auto& attrs = resp.GetGetQueueAttributes();
452+
if (attrs.HasApproximateNumberOfMessages()) {
453+
AddAttribute(result, APPROXIMATE_NUMBER_OF_MESSAGES, attrs.GetApproximateNumberOfMessages());
454+
}
455+
if (attrs.HasApproximateNumberOfMessagesDelayed()) {
456+
AddAttribute(result, APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, attrs.GetApproximateNumberOfMessagesDelayed());
457+
}
458+
if (attrs.HasCreatedTimestamp()) {
459+
AddAttribute(result, CREATED_TIMESTAMP, attrs.GetCreatedTimestamp());
460+
}
461+
if (attrs.HasDelaySeconds()) {
462+
AddAttribute(result, DELAY_SECONDS, attrs.GetDelaySeconds());
463+
}
464+
if (attrs.HasLastModifiedTimestamp()) {
465+
AddAttribute(result, LAST_MODIFIED_TIMESTAMP, attrs.GetLastModifiedTimestamp());
466+
}
467+
if (attrs.HasMaximumMessageSize()) {
468+
AddAttribute(result, MAXIMUM_MESSAGE_SIZE, attrs.GetMaximumMessageSize());
519469
}
470+
if (attrs.HasMessageRetentionPeriod()) {
471+
AddAttribute(result, MESSAGE_RETENTION_PERIOD, attrs.GetMessageRetentionPeriod());
472+
}
473+
if (attrs.HasQueueArn()) {
474+
AddAttribute(result, QUEUE_ARN, attrs.GetQueueArn());
475+
}
476+
if (attrs.HasReceiveMessageWaitTimeSeconds()) {
477+
AddAttribute(result, RECEIVE_MESSAGE_WAIT_TIME_SECONDS, attrs.GetReceiveMessageWaitTimeSeconds());
478+
}
479+
if (attrs.HasVisibilityTimeout()) {
480+
AddAttribute(result, VISIBILITY_TIMEOUT, attrs.GetVisibilityTimeout());
481+
}
482+
if (attrs.HasRedrivePolicy()) {
483+
AddAttribute(result, REDRIVE_POLICY, attrs.GetRedrivePolicy());
484+
}
485+
520486
return result;
521487
}
522488

@@ -819,8 +785,8 @@ namespace NKikimr::NYmq::V1 {
819785
} else {
820786
auto currentSuccessful = result.Addsuccessful();
821787
currentSuccessful->Setid(entry.GetId());
822-
currentSuccessful->Setmd5_of_message_attributes(entry.GetMD5OfMessageAttributes());
823-
currentSuccessful->Setmd5_of_message_body(entry.GetMD5OfMessageBody());
788+
currentSuccessful->set_m_d_5_of_message_attributes(entry.GetMD5OfMessageAttributes());
789+
currentSuccessful->set_m_d_5_of_message_body(entry.GetMD5OfMessageBody());
824790
currentSuccessful->Setmessage_id(entry.GetMessageId());
825791
currentSuccessful->Setsequence_number(std::to_string(entry.GetSequenceNumber()));
826792
}

0 commit comments

Comments
 (0)