Skip to content

LOGBROKER 8891 add 4 json sqs requests #8328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/grpc_services/service_ymq.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ void DoYmqDeleteMessageRequest(std::unique_ptr<IRequestOpCtx> p, const IFacility
void DoYmqPurgeQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqDeleteQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqChangeMessageVisibilityRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqSetQueueAttributesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqDeleteMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
}
}
12 changes: 12 additions & 0 deletions ydb/core/http_proxy/http_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,14 @@ namespace NKikimr::NHttpProxy {
action = NSQS::EAction::DeleteQueue;
} else if (Method == "ChangeMessageVisibility") {
action = NSQS::EAction::ChangeMessageVisibility;
} else if (Method == "SetQueueAttributes") {
action = NSQS::EAction::SetQueueAttributes;
} else if (Method == "SendMessageBatch") {
action = NSQS::EAction::SendMessageBatch;
}else if (Method == "DeleteMessageBatch") {
action = NSQS::EAction::DeleteMessageBatch;
} else if (Method == "ChangeMessageVisibilityBatch") {
action = NSQS::EAction::ChangeMessageVisibilityBatch;
}

requestHolder->SetRequestId(HttpContext.RequestId);
Expand Down Expand Up @@ -1063,6 +1071,10 @@ namespace NKikimr::NHttpProxy {
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(PurgeQueue);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteQueue);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibility);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SetQueueAttributes);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch);
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
}

Expand Down
158 changes: 158 additions & 0 deletions ydb/core/http_proxy/ut/http_proxy_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -1838,4 +1838,162 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT_VALUES_EQUAL(GetByPath<TString>(json, "__type"), "AWS.SimpleQueueService.NonExistentQueue");
}

Y_UNIT_TEST_F(TestSetQueueAttributes, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
NJson::TJsonValue attributes;
attributes["DelaySeconds"] = "1";
createQueueReq["Attributes"] = attributes;
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));

TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");

NJson::TJsonValue setQueueAttributes;
setQueueAttributes["QueueUrl"] = resultQueueUrl;
attributes = {};
attributes["DelaySeconds"] = "2";
setQueueAttributes["Attributes"] = attributes;

res = SendHttpRequest("/Root", "AmazonSQS.SetQueueAttributes", std::move(setQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);

NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonArray attributeNames = {"DelaySeconds"};
getQueueAttributes["AttributeNames"] = attributeNames;

res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue resultJson;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "2");
}

Y_UNIT_TEST_F(TestSendMessageBatch, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));

NJson::TJsonValue message0;
message0["Id"] = "Id-0";
message0["MessageBody"] = "MessageBody-0";
message0["MessageDeduplicationId"] = "MessageDeduplicationId-0";

NJson::TJsonValue message1;
message1["Id"] = "Id-1";
message1["MessageBody"] = "MessageBody-1";
message1["MessageDeduplicationId"] = "MessageDeduplicationId-1";

NJson::TJsonArray entries = {message0, message1};

NJson::TJsonValue sendMessageBatchReq;
sendMessageBatchReq["QueueUrl"] = resultQueueUrl;
sendMessageBatchReq["Entries"] = entries;

res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
auto succesful0 = json["Successful"][0];
UNIT_ASSERT(succesful0["Id"] == "Id-0");
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MessageId").empty());
}

Y_UNIT_TEST_F(TestDeleteMessageBatch, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));

NJson::TJsonValue message0;
message0["Id"] = "Id-0";
message0["MessageBody"] = "MessageBody-0";
message0["MessageDeduplicationId"] = "MessageDeduplicationId-0";

NJson::TJsonValue message1;
message1["Id"] = "Id-1";
message1["MessageBody"] = "MessageBody-1";
message1["MessageDeduplicationId"] = "MessageDeduplicationId-1";

NJson::TJsonArray entries = {message0, message1};

NJson::TJsonValue sendMessageBatchReq;
sendMessageBatchReq["QueueUrl"] = resultQueueUrl;
sendMessageBatchReq["Entries"] = entries;

res = SendHttpRequest("/Root", "AmazonSQS.SendMessageBatch", std::move(sendMessageBatchReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);

TVector<NJson::TJsonValue> messages;
for (int i = 0; i < 20; ++i) {
NJson::TJsonValue receiveMessageReq;
receiveMessageReq["QueueUrl"] = resultQueueUrl;
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
if (res.Body != TString("{}")) {
NJson::ReadJsonTree(res.Body, &json);
if (json["Messages"].GetArray().size() == 2) {
messages.push_back(json["Messages"][0]);
messages.push_back(json["Messages"][1]);
break;
}
if (json["Messages"].GetArray().size() == 1) {
messages.push_back(json["Messages"][0]);
if (messages.size() == 2) {
break;
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);

auto receiptHandle0 = messages[0]["ReceiptHandle"].GetString();
UNIT_ASSERT(!receiptHandle0.Empty());
auto receiptHandle1 = messages[1]["ReceiptHandle"].GetString();
UNIT_ASSERT(!receiptHandle1.Empty());

NJson::TJsonValue deleteMessageBatchReq;
deleteMessageBatchReq["QueueUrl"] = resultQueueUrl;

NJson::TJsonValue entry0;
entry0["Id"] = "Id-0";
entry0["ReceiptHandle"] = receiptHandle0;

NJson::TJsonValue entry1;
entry1["Id"] = "Id-1";
entry1["ReceiptHandle"] = receiptHandle1;

NJson::TJsonArray deleteEntries = {entry0, entry1};
deleteMessageBatchReq["Entries"] = deleteEntries;

res = SendHttpRequest("/Root", "AmazonSQS.DeleteMessageBatch", std::move(deleteMessageBatchReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT_VALUES_EQUAL(json["Successful"].GetArray().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][0]["Id"], "Id-0");
UNIT_ASSERT_VALUES_EQUAL(json["Successful"][1]["Id"], "Id-1");

NJson::TJsonValue receiveMessageReq;
receiveMessageReq["QueueUrl"] = resultQueueUrl;
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 0);

}

} // Y_UNIT_TEST_SUITE(TestHttpProxy)
8 changes: 8 additions & 0 deletions ydb/library/http_proxy/error/error.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "error.h"
#include "util/generic/maybe.h"

namespace NKikimr::NSQS {

Expand Down Expand Up @@ -42,6 +43,13 @@ ui32 TErrorClass::GetId(const TString& code) {
: it->second;
};

TMaybe<ui32> TErrorClass::GetHttpStatus(const TString& code) {
auto idIt = NKikimr::NSQS::TErrorClass::ErrorToId.find(code);
if (idIt == NKikimr::NSQS::TErrorClass::ErrorToId.end()) {
return Nothing();
}
return get<1>(IdToErrorAndCode.find(idIt->second)->second);
};

namespace NErrors {
extern const TErrorClass ACCESS_DENIED = {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/http_proxy/error/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct TErrorClass {

static const std::tuple<TString, ui32> GetErrorAndCode(ui32 id);
static ui32 GetId(const TString& code);
static TMaybe<ui32> GetHttpStatus(const TString& code);

private:
static THashSet<TString> RegisteredCodes;
Expand Down
4 changes: 4 additions & 0 deletions ydb/public/api/grpc/draft/ydb_ymq_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ service YmqService {
rpc PurgeQueue(PurgeQueueRequest) returns (PurgeQueueResponse);
rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse);
rpc ChangeMessageVisibility(ChangeMessageVisibilityRequest) returns (ChangeMessageVisibilityResponse);
rpc SetQueueAttributes(SetQueueAttributesRequest) returns (SetQueueAttributesResponse);
rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse);
rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse);
rpc ChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse);
}
70 changes: 56 additions & 14 deletions ydb/public/api/protos/draft/ymq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -228,31 +228,73 @@ message SendMessageResult {
string sequence_number = 5;
}

message SendMessageBatchRequest {
Ydb.Operations.OperationParams operation_params = 1;
repeated SendMessageRequest entries = 2;
}

message SendMessageBatchResponse {
Ydb.Operations.Operation operation = 1;
}

message BatchResultErrorEntry {
string code = 1;
string id = 2;
bool sender_fault = 3;
string message = 4;
}

message SendMessageBatchRequestEntry {
string id = 1;
int32 delay_seconds = 2;
map<string, MessageAttribute> message_attributes = 3;
string message_body = 4;
string message_deduplication_id = 5;
string message_group_id = 6;
map<string, MessageAttribute> message_system_attributes = 7;
string queue_url = 8;
}

message SendMessageBatchResultEntry {
string md5_of_message_attributes = 1;
string md5_of_message_body= 2;
string md5_of_message_system_attributes= 3;
string message_id = 4;
string sequence_number = 5;
string id = 1;
string md5_of_message_body = 2;
string message_id = 3;
string md5_of_message_attributes = 4;
string md5_of_message_system_attributes = 5;
string sequence_number = 6;
}

message SendMessageBatchRequest {
Ydb.Operations.OperationParams operation_params = 1;
repeated SendMessageBatchRequestEntry entries = 2;
string queue_url = 3;
}

message SendMessageBatchResponse {
Ydb.Operations.Operation operation = 1;
}

message SendMessageBatchResult {
repeated BatchResultErrorEntry failed = 1;
repeated SendMessageBatchResultEntry successful = 2;
}

message SetQueueAttributesRequest {
Ydb.Operations.OperationParams operation_params = 1;
map<string, string> attributes = 2;
string queue_url = 3;
}

message SetQueueAttributesResponse {
Ydb.Operations.Operation operation = 1;
}

message SetQueueAttributesResult {
}

message ListDeadLetterSourceQueuesRequest {
Ydb.Operations.OperationParams operation_params = 1;
int32 max_results = 2;
string next_token = 3;
string queue_url = 4;
}

message ListDeadLetterSourceQueuesResponse {
Ydb.Operations.Operation operation = 1;
}

message ListDeadLetterSourceQueuesResult {
string next_token = 1;
repeated string queue_urls = 2;
}
4 changes: 4 additions & 0 deletions ydb/services/ymq/grpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger)
ADD_REQUEST(PurgeQueue, DoYmqPurgeQueueRequest, nullptr, Off)
ADD_REQUEST(DeleteQueue, DoYmqDeleteQueueRequest, nullptr, Off)
ADD_REQUEST(ChangeMessageVisibility, DoYmqChangeMessageVisibilityRequest, nullptr, Off)
ADD_REQUEST(SetQueueAttributes, DoYmqSetQueueAttributesRequest, nullptr, Off)
ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off)
ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off)
ADD_REQUEST(ChangeMessageVisibilityBatch, DoYmqChangeMessageVisibilityBatchRequest, nullptr, Off)

#undef ADD_REQUEST
}
Expand Down
Loading
Loading