Skip to content

Commit 1e567c0

Browse files
LOGBROKER 8891 add 4 json sqs requests (#8328)
1 parent ba88e86 commit 1e567c0

File tree

10 files changed

+467
-14
lines changed

10 files changed

+467
-14
lines changed

ydb/core/grpc_services/service_ymq.h

+4
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,9 @@ void DoYmqDeleteMessageRequest(std::unique_ptr<IRequestOpCtx> p, const IFacility
2121
void DoYmqPurgeQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2222
void DoYmqDeleteQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2323
void DoYmqChangeMessageVisibilityRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
24+
void DoYmqSetQueueAttributesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
25+
void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
26+
void DoYmqDeleteMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
27+
void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2428
}
2529
}

ydb/core/http_proxy/http_req.cpp

+12
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,14 @@ namespace NKikimr::NHttpProxy {
528528
action = NSQS::EAction::DeleteQueue;
529529
} else if (Method == "ChangeMessageVisibility") {
530530
action = NSQS::EAction::ChangeMessageVisibility;
531+
} else if (Method == "SetQueueAttributes") {
532+
action = NSQS::EAction::SetQueueAttributes;
533+
} else if (Method == "SendMessageBatch") {
534+
action = NSQS::EAction::SendMessageBatch;
535+
}else if (Method == "DeleteMessageBatch") {
536+
action = NSQS::EAction::DeleteMessageBatch;
537+
} else if (Method == "ChangeMessageVisibilityBatch") {
538+
action = NSQS::EAction::ChangeMessageVisibilityBatch;
531539
}
532540

533541
requestHolder->SetRequestId(HttpContext.RequestId);
@@ -1063,6 +1071,10 @@ namespace NKikimr::NHttpProxy {
10631071
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(PurgeQueue);
10641072
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteQueue);
10651073
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibility);
1074+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SetQueueAttributes);
1075+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch);
1076+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch);
1077+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch);
10661078
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
10671079
}
10681080

ydb/core/http_proxy/ut/http_proxy_ut.h

+158
Original file line numberDiff line numberDiff line change
@@ -1838,4 +1838,162 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
18381838
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
18391839
UNIT_ASSERT_VALUES_EQUAL(GetByPath<TString>(json, "__type"), "AWS.SimpleQueueService.NonExistentQueue");
18401840
}
1841+
1842+
Y_UNIT_TEST_F(TestSetQueueAttributes, THttpProxyTestMock) {
1843+
auto createQueueReq = CreateSqsCreateQueueRequest();
1844+
NJson::TJsonValue attributes;
1845+
attributes["DelaySeconds"] = "1";
1846+
createQueueReq["Attributes"] = attributes;
1847+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
1848+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1849+
NJson::TJsonValue json;
1850+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1851+
1852+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
1853+
1854+
NJson::TJsonValue setQueueAttributes;
1855+
setQueueAttributes["QueueUrl"] = resultQueueUrl;
1856+
attributes = {};
1857+
attributes["DelaySeconds"] = "2";
1858+
setQueueAttributes["Attributes"] = attributes;
1859+
1860+
res = SendHttpRequest("/Root", "AmazonSQS.SetQueueAttributes", std::move(setQueueAttributes), FormAuthorizationStr("ru-central1"));
1861+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1862+
1863+
NJson::TJsonValue getQueueAttributes;
1864+
getQueueAttributes["QueueUrl"] = resultQueueUrl;
1865+
NJson::TJsonArray attributeNames = {"DelaySeconds"};
1866+
getQueueAttributes["AttributeNames"] = attributeNames;
1867+
1868+
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
1869+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1870+
NJson::TJsonValue resultJson;
1871+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
1872+
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "2");
1873+
}
1874+
1875+
Y_UNIT_TEST_F(TestSendMessageBatch, THttpProxyTestMock) {
1876+
auto createQueueReq = CreateSqsCreateQueueRequest();
1877+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
1878+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1879+
NJson::TJsonValue json;
1880+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1881+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
1882+
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));
1883+
1884+
NJson::TJsonValue message0;
1885+
message0["Id"] = "Id-0";
1886+
message0["MessageBody"] = "MessageBody-0";
1887+
message0["MessageDeduplicationId"] = "MessageDeduplicationId-0";
1888+
1889+
NJson::TJsonValue message1;
1890+
message1["Id"] = "Id-1";
1891+
message1["MessageBody"] = "MessageBody-1";
1892+
message1["MessageDeduplicationId"] = "MessageDeduplicationId-1";
1893+
1894+
NJson::TJsonArray entries = {message0, message1};
1895+
1896+
NJson::TJsonValue sendMessageBatchReq;
1897+
sendMessageBatchReq["QueueUrl"] = resultQueueUrl;
1898+
sendMessageBatchReq["Entries"] = entries;
1899+
1900+
res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1"));
1901+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1902+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1903+
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
1904+
auto succesful0 = json["Successful"][0];
1905+
UNIT_ASSERT(succesful0["Id"] == "Id-0");
1906+
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageBody").empty());
1907+
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MessageId").empty());
1908+
}
1909+
1910+
Y_UNIT_TEST_F(TestDeleteMessageBatch, THttpProxyTestMock) {
1911+
auto createQueueReq = CreateSqsCreateQueueRequest();
1912+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
1913+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1914+
NJson::TJsonValue json;
1915+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1916+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
1917+
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));
1918+
1919+
NJson::TJsonValue message0;
1920+
message0["Id"] = "Id-0";
1921+
message0["MessageBody"] = "MessageBody-0";
1922+
message0["MessageDeduplicationId"] = "MessageDeduplicationId-0";
1923+
1924+
NJson::TJsonValue message1;
1925+
message1["Id"] = "Id-1";
1926+
message1["MessageBody"] = "MessageBody-1";
1927+
message1["MessageDeduplicationId"] = "MessageDeduplicationId-1";
1928+
1929+
NJson::TJsonArray entries = {message0, message1};
1930+
1931+
NJson::TJsonValue sendMessageBatchReq;
1932+
sendMessageBatchReq["QueueUrl"] = resultQueueUrl;
1933+
sendMessageBatchReq["Entries"] = entries;
1934+
1935+
res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1"));
1936+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1937+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1938+
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
1939+
1940+
TVector<NJson::TJsonValue> messages;
1941+
for (int i = 0; i < 20; ++i) {
1942+
NJson::TJsonValue receiveMessageReq;
1943+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
1944+
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
1945+
if (res.Body != TString("{}")) {
1946+
NJson::ReadJsonTree(res.Body, &json);
1947+
if (json["Messages"].GetArray().size() == 2) {
1948+
messages.push_back(json["Messages"][0]);
1949+
messages.push_back(json["Messages"][1]);
1950+
break;
1951+
}
1952+
if (json["Messages"].GetArray().size() == 1) {
1953+
messages.push_back(json["Messages"][0]);
1954+
if (messages.size() == 2) {
1955+
break;
1956+
}
1957+
}
1958+
}
1959+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
1960+
}
1961+
1962+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
1963+
1964+
auto receiptHandle0 = messages[0]["ReceiptHandle"].GetString();
1965+
UNIT_ASSERT(!receiptHandle0.Empty());
1966+
auto receiptHandle1 = messages[1]["ReceiptHandle"].GetString();
1967+
UNIT_ASSERT(!receiptHandle1.Empty());
1968+
1969+
NJson::TJsonValue deleteMessageBatchReq;
1970+
deleteMessageBatchReq["QueueUrl"] = resultQueueUrl;
1971+
1972+
NJson::TJsonValue entry0;
1973+
entry0["Id"] = "Id-0";
1974+
entry0["ReceiptHandle"] = receiptHandle0;
1975+
1976+
NJson::TJsonValue entry1;
1977+
entry1["Id"] = "Id-1";
1978+
entry1["ReceiptHandle"] = receiptHandle1;
1979+
1980+
NJson::TJsonArray deleteEntries = {entry0, entry1};
1981+
deleteMessageBatchReq["Entries"] = deleteEntries;
1982+
1983+
res = SendHttpRequest("/Root", "AmazonSQS.DeleteMessageBatch", std::move(deleteMessageBatchReq), FormAuthorizationStr("ru-central1"));
1984+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1985+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1986+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"].GetArray().size(), 2);
1987+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][0]["Id"], "Id-0");
1988+
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][1]["Id"], "Id-1");
1989+
1990+
NJson::TJsonValue receiveMessageReq;
1991+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
1992+
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
1993+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1994+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1995+
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 0);
1996+
1997+
}
1998+
18411999
} // Y_UNIT_TEST_SUITE(TestHttpProxy)

ydb/library/http_proxy/error/error.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "error.h"
2+
#include "util/generic/maybe.h"
23

34
namespace NKikimr::NSQS {
45

@@ -42,6 +43,13 @@ ui32 TErrorClass::GetId(const TString& code) {
4243
: it->second;
4344
};
4445

46+
TMaybe<ui32> TErrorClass::GetHttpStatus(const TString& code) {
47+
auto idIt = NKikimr::NSQS::TErrorClass::ErrorToId.find(code);
48+
if (idIt == NKikimr::NSQS::TErrorClass::ErrorToId.end()) {
49+
return Nothing();
50+
}
51+
return get<1>(IdToErrorAndCode.find(idIt->second)->second);
52+
};
4553

4654
namespace NErrors {
4755
extern const TErrorClass ACCESS_DENIED = {

ydb/library/http_proxy/error/error.h

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ struct TErrorClass {
2323

2424
static const std::tuple<TString, ui32> GetErrorAndCode(ui32 id);
2525
static ui32 GetId(const TString& code);
26+
static TMaybe<ui32> GetHttpStatus(const TString& code);
2627

2728
private:
2829
static THashSet<TString> RegisteredCodes;

ydb/public/api/grpc/draft/ydb_ymq_v1.proto

+4
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,8 @@ service YmqService {
2020
rpc PurgeQueue(PurgeQueueRequest) returns (PurgeQueueResponse);
2121
rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse);
2222
rpc ChangeMessageVisibility(ChangeMessageVisibilityRequest) returns (ChangeMessageVisibilityResponse);
23+
rpc SetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse);
24+
rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse);
25+
rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse);
26+
rpc ChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse);
2327
}

ydb/public/api/protos/draft/ymq.proto

+56-14
Original file line numberDiff line numberDiff line change
@@ -228,31 +228,73 @@ message SendMessageResult {
228228
string sequence_number = 5;
229229
}
230230

231-
message SendMessageBatchRequest {
232-
Ydb.Operations.OperationParams operation_params = 1;
233-
repeated SendMessageRequest entries = 2;
234-
}
235-
236-
message SendMessageBatchResponse {
237-
Ydb.Operations.Operation operation = 1;
238-
}
239-
240231
message BatchResultErrorEntry {
241232
string code = 1;
242233
string id = 2;
243234
bool sender_fault = 3;
244235
string message = 4;
245236
}
246237

238+
message SendMessageBatchRequestEntry {
239+
string id = 1;
240+
int32 delay_seconds = 2;
241+
map<string, MessageAttribute> message_attributes = 3;
242+
string message_body = 4;
243+
string message_deduplication_id = 5;
244+
string message_group_id = 6;
245+
map<string, MessageAttribute> message_system_attributes = 7;
246+
string queue_url = 8;
247+
}
248+
247249
message SendMessageBatchResultEntry {
248-
string md5_of_message_attributes = 1;
249-
string md5_of_message_body= 2;
250-
string md5_of_message_system_attributes= 3;
251-
string message_id = 4;
252-
string sequence_number = 5;
250+
string id = 1;
251+
string md5_of_message_body = 2;
252+
string message_id = 3;
253+
string md5_of_message_attributes = 4;
254+
string md5_of_message_system_attributes = 5;
255+
string sequence_number = 6;
256+
}
257+
258+
message SendMessageBatchRequest {
259+
Ydb.Operations.OperationParams operation_params = 1;
260+
repeated SendMessageBatchRequestEntry entries = 2;
261+
string queue_url = 3;
262+
}
263+
264+
message SendMessageBatchResponse {
265+
Ydb.Operations.Operation operation = 1;
253266
}
254267

255268
message SendMessageBatchResult {
256269
repeated BatchResultErrorEntry failed = 1;
257270
repeated SendMessageBatchResultEntry successful = 2;
258271
}
272+
273+
message SetQueueAttributesRequest {
274+
Ydb.Operations.OperationParams operation_params = 1;
275+
map<string, string> attributes = 2;
276+
string queue_url = 3;
277+
}
278+
279+
message SetQueueAttributesResponse {
280+
Ydb.Operations.Operation operation = 1;
281+
}
282+
283+
message SetQueueAttributesResult {
284+
}
285+
286+
message ListDeadLetterSourceQueuesRequest {
287+
Ydb.Operations.OperationParams operation_params = 1;
288+
int32 max_results = 2;
289+
string next_token = 3;
290+
string queue_url = 4;
291+
}
292+
293+
message ListDeadLetterSourceQueuesResponse {
294+
Ydb.Operations.Operation operation = 1;
295+
}
296+
297+
message ListDeadLetterSourceQueuesResult {
298+
string next_token = 1;
299+
repeated string queue_urls = 2;
300+
}

ydb/services/ymq/grpc_service.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger)
4242
ADD_REQUEST(PurgeQueue, DoYmqPurgeQueueRequest, nullptr, Off)
4343
ADD_REQUEST(DeleteQueue, DoYmqDeleteQueueRequest, nullptr, Off)
4444
ADD_REQUEST(ChangeMessageVisibility, DoYmqChangeMessageVisibilityRequest, nullptr, Off)
45+
ADD_REQUEST(SetQueueAttributes, DoYmqSetQueueAttributesRequest, nullptr, Off)
46+
ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off)
47+
ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off)
48+
ADD_REQUEST(ChangeMessageVisibilityBatch, DoYmqChangeMessageVisibilityBatchRequest, nullptr, Off)
4549

4650
#undef ADD_REQUEST
4751
}

0 commit comments

Comments
 (0)