diff --git a/ydb/core/grpc_services/service_ymq.h b/ydb/core/grpc_services/service_ymq.h index 809ed01096fc..4d35c1b49db8 100644 --- a/ydb/core/grpc_services/service_ymq.h +++ b/ydb/core/grpc_services/service_ymq.h @@ -21,5 +21,9 @@ void DoYmqDeleteMessageRequest(std::unique_ptr p, const IFacility void DoYmqPurgeQueueRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqDeleteQueueRequest(std::unique_ptr p, const IFacilityProvider& f); void DoYmqChangeMessageVisibilityRequest(std::unique_ptr p, const IFacilityProvider& f); +void DoYmqSetQueueAttributesRequest(std::unique_ptr p, const IFacilityProvider& f); +void DoYmqSendMessageBatchRequest(std::unique_ptr p, const IFacilityProvider& f); +void DoYmqDeleteMessageBatchRequest(std::unique_ptr p, const IFacilityProvider& f); +void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr p, const IFacilityProvider& f); } } diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 931d6a2102c6..dee6f7177be0 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -528,6 +528,14 @@ namespace NKikimr::NHttpProxy { action = NSQS::EAction::DeleteQueue; } else if (Method == "ChangeMessageVisibility") { action = NSQS::EAction::ChangeMessageVisibility; + } else if (Method == "SetQueueAttributes") { + action = NSQS::EAction::SetQueueAttributes; + } else if (Method == "SendMessageBatch") { + action = NSQS::EAction::SendMessageBatch; + }else if (Method == "DeleteMessageBatch") { + action = NSQS::EAction::DeleteMessageBatch; + } else if (Method == "ChangeMessageVisibilityBatch") { + action = NSQS::EAction::ChangeMessageVisibilityBatch; } requestHolder->SetRequestId(HttpContext.RequestId); @@ -1063,6 +1071,10 @@ namespace NKikimr::NHttpProxy { DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(PurgeQueue); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteQueue); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibility); + DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SetQueueAttributes); + DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch); + DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch); + DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch); #undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN } diff --git a/ydb/core/http_proxy/ut/http_proxy_ut.h b/ydb/core/http_proxy/ut/http_proxy_ut.h index 5a4f0b11ebcc..8b43a0f98969 100644 --- a/ydb/core/http_proxy/ut/http_proxy_ut.h +++ b/ydb/core/http_proxy/ut/http_proxy_ut.h @@ -1838,4 +1838,162 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); UNIT_ASSERT_VALUES_EQUAL(GetByPath(json, "__type"), "AWS.SimpleQueueService.NonExistentQueue"); } + + Y_UNIT_TEST_F(TestSetQueueAttributes, THttpProxyTestMock) { + auto createQueueReq = CreateSqsCreateQueueRequest(); + NJson::TJsonValue attributes; + attributes["DelaySeconds"] = "1"; + createQueueReq["Attributes"] = attributes; + auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + + TString resultQueueUrl = GetByPath(json, "QueueUrl"); + + NJson::TJsonValue setQueueAttributes; + setQueueAttributes["QueueUrl"] = resultQueueUrl; + attributes = {}; + attributes["DelaySeconds"] = "2"; + setQueueAttributes["Attributes"] = attributes; + + res = SendHttpRequest("/Root", "AmazonSQS.SetQueueAttributes", std::move(setQueueAttributes), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + + NJson::TJsonValue getQueueAttributes; + getQueueAttributes["QueueUrl"] = resultQueueUrl; + NJson::TJsonArray attributeNames = {"DelaySeconds"}; + getQueueAttributes["AttributeNames"] = attributeNames; + + res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue resultJson; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson)); + UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "2"); + } + + Y_UNIT_TEST_F(TestSendMessageBatch, THttpProxyTestMock) { + auto createQueueReq = CreateSqsCreateQueueRequest(); + auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + TString resultQueueUrl = GetByPath(json, "QueueUrl"); + UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName")); + + NJson::TJsonValue message0; + message0["Id"] = "Id-0"; + message0["MessageBody"] = "MessageBody-0"; + message0["MessageDeduplicationId"] = "MessageDeduplicationId-0"; + + NJson::TJsonValue message1; + message1["Id"] = "Id-1"; + message1["MessageBody"] = "MessageBody-1"; + message1["MessageDeduplicationId"] = "MessageDeduplicationId-1"; + + NJson::TJsonArray entries = {message0, message1}; + + NJson::TJsonValue sendMessageBatchReq; + sendMessageBatchReq["QueueUrl"] = resultQueueUrl; + sendMessageBatchReq["Entries"] = entries; + + res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT(json["Successful"].GetArray().size() == 2); + auto succesful0 = json["Successful"][0]; + UNIT_ASSERT(succesful0["Id"] == "Id-0"); + UNIT_ASSERT(!GetByPath(succesful0, "Md5OfMessageBody").empty()); + UNIT_ASSERT(!GetByPath(succesful0, "MessageId").empty()); + } + + Y_UNIT_TEST_F(TestDeleteMessageBatch, THttpProxyTestMock) { + auto createQueueReq = CreateSqsCreateQueueRequest(); + auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + TString resultQueueUrl = GetByPath(json, "QueueUrl"); + UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName")); + + NJson::TJsonValue message0; + message0["Id"] = "Id-0"; + message0["MessageBody"] = "MessageBody-0"; + message0["MessageDeduplicationId"] = "MessageDeduplicationId-0"; + + NJson::TJsonValue message1; + message1["Id"] = "Id-1"; + message1["MessageBody"] = "MessageBody-1"; + message1["MessageDeduplicationId"] = "MessageDeduplicationId-1"; + + NJson::TJsonArray entries = {message0, message1}; + + NJson::TJsonValue sendMessageBatchReq; + sendMessageBatchReq["QueueUrl"] = resultQueueUrl; + sendMessageBatchReq["Entries"] = entries; + + res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT(json["Successful"].GetArray().size() == 2); + + TVector messages; + for (int i = 0; i < 20; ++i) { + NJson::TJsonValue receiveMessageReq; + receiveMessageReq["QueueUrl"] = resultQueueUrl; + res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1")); + if (res.Body != TString("{}")) { + NJson::ReadJsonTree(res.Body, &json); + if (json["Messages"].GetArray().size() == 2) { + messages.push_back(json["Messages"][0]); + messages.push_back(json["Messages"][1]); + break; + } + if (json["Messages"].GetArray().size() == 1) { + messages.push_back(json["Messages"][0]); + if (messages.size() == 2) { + break; + } + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); + + auto receiptHandle0 = messages[0]["ReceiptHandle"].GetString(); + UNIT_ASSERT(!receiptHandle0.Empty()); + auto receiptHandle1 = messages[1]["ReceiptHandle"].GetString(); + UNIT_ASSERT(!receiptHandle1.Empty()); + + NJson::TJsonValue deleteMessageBatchReq; + deleteMessageBatchReq["QueueUrl"] = resultQueueUrl; + + NJson::TJsonValue entry0; + entry0["Id"] = "Id-0"; + entry0["ReceiptHandle"] = receiptHandle0; + + NJson::TJsonValue entry1; + entry1["Id"] = "Id-1"; + entry1["ReceiptHandle"] = receiptHandle1; + + NJson::TJsonArray deleteEntries = {entry0, entry1}; + deleteMessageBatchReq["Entries"] = deleteEntries; + + res = SendHttpRequest("/Root", "AmazonSQS.DeleteMessageBatch", std::move(deleteMessageBatchReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT_VALUES_EQUAL(json["Successful"].GetArray().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(json["Successful"][0]["Id"], "Id-0"); + UNIT_ASSERT_VALUES_EQUAL(json["Successful"][1]["Id"], "Id-1"); + + NJson::TJsonValue receiveMessageReq; + receiveMessageReq["QueueUrl"] = resultQueueUrl; + res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json)); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200); + UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 0); + + } + } // Y_UNIT_TEST_SUITE(TestHttpProxy) diff --git a/ydb/library/http_proxy/error/error.cpp b/ydb/library/http_proxy/error/error.cpp index 68ff471976be..cd0ff000a0d9 100644 --- a/ydb/library/http_proxy/error/error.cpp +++ b/ydb/library/http_proxy/error/error.cpp @@ -1,4 +1,5 @@ #include "error.h" +#include "util/generic/maybe.h" namespace NKikimr::NSQS { @@ -42,6 +43,13 @@ ui32 TErrorClass::GetId(const TString& code) { : it->second; }; +TMaybe TErrorClass::GetHttpStatus(const TString& code) { + auto idIt = NKikimr::NSQS::TErrorClass::ErrorToId.find(code); + if (idIt == NKikimr::NSQS::TErrorClass::ErrorToId.end()) { + return Nothing(); + } + return get<1>(IdToErrorAndCode.find(idIt->second)->second); +}; namespace NErrors { extern const TErrorClass ACCESS_DENIED = { diff --git a/ydb/library/http_proxy/error/error.h b/ydb/library/http_proxy/error/error.h index e2a1a1d1d0bc..d9207ccc2133 100644 --- a/ydb/library/http_proxy/error/error.h +++ b/ydb/library/http_proxy/error/error.h @@ -23,6 +23,7 @@ struct TErrorClass { static const std::tuple GetErrorAndCode(ui32 id); static ui32 GetId(const TString& code); + static TMaybe GetHttpStatus(const TString& code); private: static THashSet RegisteredCodes; diff --git a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto index 2c7a413590a8..58f104289f33 100644 --- a/ydb/public/api/grpc/draft/ydb_ymq_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_ymq_v1.proto @@ -20,4 +20,8 @@ service YmqService { rpc PurgeQueue(PurgeQueueRequest) returns (PurgeQueueResponse); rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse); rpc ChangeMessageVisibility(ChangeMessageVisibilityRequest) returns (ChangeMessageVisibilityResponse); + rpc SetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse); + rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse); + rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse); + rpc ChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse); } diff --git a/ydb/public/api/protos/draft/ymq.proto b/ydb/public/api/protos/draft/ymq.proto index 081a97c48b40..41997da7fc39 100644 --- a/ydb/public/api/protos/draft/ymq.proto +++ b/ydb/public/api/protos/draft/ymq.proto @@ -228,15 +228,6 @@ message SendMessageResult { string sequence_number = 5; } -message SendMessageBatchRequest { - Ydb.Operations.OperationParams operation_params = 1; - repeated SendMessageRequest entries = 2; -} - -message SendMessageBatchResponse { - Ydb.Operations.Operation operation = 1; -} - message BatchResultErrorEntry { string code = 1; string id = 2; @@ -244,15 +235,66 @@ message BatchResultErrorEntry { string message = 4; } +message SendMessageBatchRequestEntry { + string id = 1; + int32 delay_seconds = 2; + map message_attributes = 3; + string message_body = 4; + string message_deduplication_id = 5; + string message_group_id = 6; + map message_system_attributes = 7; + string queue_url = 8; +} + message SendMessageBatchResultEntry { - string md5_of_message_attributes = 1; - string md5_of_message_body= 2; - string md5_of_message_system_attributes= 3; - string message_id = 4; - string sequence_number = 5; + string id = 1; + string md5_of_message_body = 2; + string message_id = 3; + string md5_of_message_attributes = 4; + string md5_of_message_system_attributes = 5; + string sequence_number = 6; +} + +message SendMessageBatchRequest { + Ydb.Operations.OperationParams operation_params = 1; + repeated SendMessageBatchRequestEntry entries = 2; + string queue_url = 3; +} + +message SendMessageBatchResponse { + Ydb.Operations.Operation operation = 1; } message SendMessageBatchResult { repeated BatchResultErrorEntry failed = 1; repeated SendMessageBatchResultEntry successful = 2; } + +message SetQueueAttributesRequest { + Ydb.Operations.OperationParams operation_params = 1; + map attributes = 2; + string queue_url = 3; +} + +message SetQueueAttributesResponse { + Ydb.Operations.Operation operation = 1; +} + +message SetQueueAttributesResult { +} + +message ListDeadLetterSourceQueuesRequest { + Ydb.Operations.OperationParams operation_params = 1; + int32 max_results = 2; + string next_token = 3; + string queue_url = 4; +} + +message ListDeadLetterSourceQueuesResponse { + Ydb.Operations.Operation operation = 1; +} + +message ListDeadLetterSourceQueuesResult { + string next_token = 1; + repeated string queue_urls = 2; +} diff --git a/ydb/services/ymq/grpc_service.cpp b/ydb/services/ymq/grpc_service.cpp index 570d44cab1da..20d891253dae 100644 --- a/ydb/services/ymq/grpc_service.cpp +++ b/ydb/services/ymq/grpc_service.cpp @@ -42,6 +42,10 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) ADD_REQUEST(PurgeQueue, DoYmqPurgeQueueRequest, nullptr, Off) ADD_REQUEST(DeleteQueue, DoYmqDeleteQueueRequest, nullptr, Off) ADD_REQUEST(ChangeMessageVisibility, DoYmqChangeMessageVisibilityRequest, nullptr, Off) + ADD_REQUEST(SetQueueAttributes, DoYmqSetQueueAttributesRequest, nullptr, Off) + ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off) + ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off) + ADD_REQUEST(ChangeMessageVisibilityBatch, DoYmqChangeMessageVisibilityBatchRequest, nullptr, Off) #undef ADD_REQUEST } diff --git a/ydb/services/ymq/ymq_proxy.cpp b/ydb/services/ymq/ymq_proxy.cpp index 095ec8dec5f1..0453f6f5dfb0 100644 --- a/ydb/services/ymq/ymq_proxy.cpp +++ b/ydb/services/ymq/ymq_proxy.cpp @@ -694,6 +694,218 @@ namespace NKikimr::NYmq::V1 { return result; } }; + + class TSetQueueAttributesReplyCallback : public TReplyCallback< + NKikimr::NSQS::TSetQueueAttributesResponse, + Ydb::Ymq::V1::SetQueueAttributesResult> { + public: + using TReplyCallback::TReplyCallback; + + private: + const NKikimr::NSQS::TSetQueueAttributesResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override { + return resp.GetSetQueueAttributes(); + } + + Ydb::Ymq::V1::SetQueueAttributesResult GetResult(const NKikimrClient::TSqsResponse&) override { + Ydb::Ymq::V1::SetQueueAttributesResult result; + + return result; + } + }; + + + void AddAttribute(THolder& requestHolder, const TString& name, TString value) { + auto attribute = requestHolder->MutableSetQueueAttributes()->MutableAttributes()->Add(); + attribute->SetName(name); + attribute->SetValue(value); + }; + + class TSetQueueAttributesActor : public TRpcRequestActor< + TEvYmqSetQueueAttributesRequest, + NKikimr::NSQS::TSetQueueAttributesRequest, + TSetQueueAttributesReplyCallback> { + public: + using TRpcRequestActor::TRpcRequestActor; + + private: + NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(THolder& requestHolder) override { + auto result = requestHolder->MutableSetQueueAttributes(); + for (auto& [name, value]: GetProtoRequest()->Getattributes()) { + AddAttribute(requestHolder, name, value); + } + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + return result; + } + }; + + class TSendMessageBatchReplyCallback : public TReplyCallback< + NKikimr::NSQS::TSendMessageBatchResponse, + Ydb::Ymq::V1::SendMessageBatchResult> { + public: + using TReplyCallback::TReplyCallback; + + private: + const NKikimr::NSQS::TSendMessageBatchResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override { + return resp.GetSendMessageBatch(); + } + + Ydb::Ymq::V1::SendMessageBatchResult GetResult(const NKikimrClient::TSqsResponse& response) override { + Ydb::Ymq::V1::SendMessageBatchResult result; + response.GetSendMessageBatch(); + for (auto& entry : response.GetSendMessageBatch().GetEntries()) { + if (entry.GetError().HasErrorCode()) { + auto currentFailed = result.Addfailed(); + currentFailed->Setcode(entry.GetError().GetErrorCode()); + currentFailed->Setid(entry.GetId()); + currentFailed->Setmessage(entry.GetError().GetMessage()); + + ui32 httpStatus = NSQS::TErrorClass::GetHttpStatus(entry.GetError().GetErrorCode()).GetOrElse(400); + currentFailed->Setsender_fault(400 <= httpStatus && httpStatus < 500); + } else { + auto currentSuccessful = result.Addsuccessful(); + currentSuccessful->Setid(entry.GetId()); + currentSuccessful->Setmd5_of_message_body(entry.GetMD5OfMessageBody()); + currentSuccessful->Setmessage_id(entry.GetMessageId()); + currentSuccessful->Setsequence_number(std::to_string(entry.GetSequenceNumber())); + } + } + return result; + } + }; + + class TSendMessageBatchActor : public TRpcRequestActor< + TEvYmqSendMessageBatchRequest, + NKikimr::NSQS::TSendMessageBatchRequest, + TSendMessageBatchReplyCallback> { + public: + using TRpcRequestActor::TRpcRequestActor; + + private: + NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(THolder& requestHolder) override { + auto result = requestHolder->MutableSendMessageBatch(); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + for (auto& requestEntry : GetProtoRequest()->Getentries()) { + auto entry = requestHolder->MutableSendMessageBatch()->MutableEntries()->Add(); + entry->SetId(requestEntry.Getid()); + for (auto& srcAttribute: requestEntry.Getmessage_attributes()) { + auto dstAttribute = entry->MutableMessageAttributes()->Add(); + dstAttribute->SetName(srcAttribute.first); + dstAttribute->SetStringValue(srcAttribute.second.Getstring_value()); + dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value()); + dstAttribute->SetDataType(srcAttribute.second.Getdata_type()); + } + entry->SetMessageDeduplicationId(requestEntry.Getmessage_deduplication_id()); + entry->SetMessageGroupId(requestEntry.Getmessage_group_id()); + entry->SetMessageBody(requestEntry.Getmessage_body()); + } + return result; + } + }; + + class TDeleteMessageBatchReplyCallback : public TReplyCallback< + NKikimr::NSQS::TDeleteMessageBatchResponse, + Ydb::Ymq::V1::DeleteMessageBatchResult> { + public: + using TReplyCallback::TReplyCallback; + + private: + const NKikimr::NSQS::TDeleteMessageBatchResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override { + return resp.GetDeleteMessageBatch(); + } + + Ydb::Ymq::V1::DeleteMessageBatchResult GetResult(const NKikimrClient::TSqsResponse& response) override { + Ydb::Ymq::V1::DeleteMessageBatchResult result; + auto entries = response.GetDeleteMessageBatch().GetEntries(); + for (auto i = 0; i < entries.size(); i++) { + auto &entry = response.GetDeleteMessageBatch().GetEntries()[i]; + if (entry.GetError().HasErrorCode()) { + auto currentFailed = result.Addfailed(); + currentFailed->Setcode(entry.GetError().GetErrorCode()); + currentFailed->Setid(entry.GetId()); + currentFailed->Setmessage(entry.GetError().GetMessage()); + + ui32 httpStatus = NSQS::TErrorClass::GetHttpStatus(entry.GetError().GetErrorCode()).GetOrElse(400); + currentFailed->Setsender_fault(400 <= httpStatus && httpStatus < 500); + } else { + auto currentSuccessful = result.Addsuccessful(); + currentSuccessful->Setid(entry.GetId()); + } + } + return result; + } + }; + + class TDeleteMessageBatchActor : public TRpcRequestActor< + TEvYmqDeleteMessageBatchRequest, + NKikimr::NSQS::TDeleteMessageBatchRequest, + TDeleteMessageBatchReplyCallback> { + public: + using TRpcRequestActor::TRpcRequestActor; + + private: + NKikimr::NSQS::TDeleteMessageBatchRequest* GetRequest(THolder& requestHolder) override { + auto result = requestHolder->MutableDeleteMessageBatch(); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + for (auto& requestEntry : GetProtoRequest()->Getentries()) { + auto entry = requestHolder->MutableDeleteMessageBatch()->AddEntries(); + entry->SetId(requestEntry.Getid()); + entry->SetReceiptHandle(requestEntry.Getreceipt_handle()); + } + return result; + } + }; + + class TChangeMessageVisibilityBatchReplyCallback : public TReplyCallback< + NKikimr::NSQS::TChangeMessageVisibilityBatchResponse, + Ydb::Ymq::V1::ChangeMessageVisibilityBatchResult> { + public: + using TReplyCallback::TReplyCallback; + + private: + const NKikimr::NSQS::TChangeMessageVisibilityBatchResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override { + return resp.GetChangeMessageVisibilityBatch(); + } + + Ydb::Ymq::V1::ChangeMessageVisibilityBatchResult GetResult(const NKikimrClient::TSqsResponse& response) override { + Ydb::Ymq::V1::ChangeMessageVisibilityBatchResult result; + for (auto& entry : response.GetChangeMessageVisibilityBatch().GetEntries()) { + if (entry.GetError().HasErrorCode()) { + auto currentFailed = result.Addfailed(); + currentFailed->Setcode(entry.GetError().GetErrorCode()); + currentFailed->Setid(entry.GetId()); + currentFailed->Setmessage(entry.GetError().GetMessage()); + + ui32 httpStatus = NSQS::TErrorClass::GetHttpStatus(entry.GetError().GetErrorCode()).GetOrElse(400); + currentFailed->Setsender_fault(400 <= httpStatus && httpStatus < 500); + } else { + auto currentSuccessful = result.Addsuccessful(); + currentSuccessful->Setid(entry.GetId()); + } + } + return result; + } + }; + + class TChangeMessageVisibilityBatchActor : public TRpcRequestActor< + TEvYmqChangeMessageVisibilityBatchRequest, + NKikimr::NSQS::TChangeMessageVisibilityBatchRequest, + TChangeMessageVisibilityBatchReplyCallback> { + public: + using TRpcRequestActor::TRpcRequestActor; + + private: + NKikimr::NSQS::TChangeMessageVisibilityBatchRequest* GetRequest(THolder& requestHolder) override { + auto result = requestHolder->MutableChangeMessageVisibilityBatch(); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + for (auto& requestEntry : GetProtoRequest()->Getentries()) { + auto entry = requestHolder->MutableChangeMessageVisibilityBatch()->MutableEntries()->Add(); + entry->SetId(requestEntry.Getid()); + entry->SetVisibilityTimeout(requestEntry.Getvisibility_timeout()); + entry->SetReceiptHandle(requestEntry.Getreceipt_handle()); + } + return result; + } + }; } namespace NKikimr::NGRpcService { @@ -717,5 +929,9 @@ DECLARE_RPC(DeleteMessage); DECLARE_RPC(PurgeQueue); DECLARE_RPC(DeleteQueue); DECLARE_RPC(ChangeMessageVisibility); +DECLARE_RPC(SetQueueAttributes); +DECLARE_RPC(SendMessageBatch); +DECLARE_RPC(DeleteMessageBatch); +DECLARE_RPC(ChangeMessageVisibilityBatch); } diff --git a/ydb/services/ymq/ymq_proxy.h b/ydb/services/ymq/ymq_proxy.h index 18a12fdfe6e5..01636f81ce36 100644 --- a/ydb/services/ymq/ymq_proxy.h +++ b/ydb/services/ymq/ymq_proxy.h @@ -23,6 +23,10 @@ using TEvYmqDeleteMessageRequest = TGrpcRequestOperationCall; using TEvYmqDeleteQueueRequest = TGrpcRequestOperationCall; using TEvYmqChangeMessageVisibilityRequest = TGrpcRequestOperationCall; +using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall; +using TEvYmqSendMessageBatchRequest = TGrpcRequestOperationCall; +using TEvYmqDeleteMessageBatchRequest = TGrpcRequestOperationCall; +using TEvYmqChangeMessageVisibilityBatchRequest = TGrpcRequestOperationCall; } }