Skip to content

Commit 3e2cc95

Browse files
LOGBROKER 8891 list dead letter source queues (#8330)
1 parent 93d0e7f commit 3e2cc95

File tree

8 files changed

+90
-0
lines changed

8 files changed

+90
-0
lines changed

ydb/core/grpc_services/service_ymq.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@ void DoYmqSetQueueAttributesRequest(std::unique_ptr<IRequestOpCtx> p, const IFac
2525
void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2626
void DoYmqDeleteMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2727
void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
28+
void DoYmqListDeadLetterSourceQueuesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2829
}
2930
}

ydb/core/http_proxy/http_req.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,8 @@ namespace NKikimr::NHttpProxy {
536536
action = NSQS::EAction::DeleteMessageBatch;
537537
} else if (Method == "ChangeMessageVisibilityBatch") {
538538
action = NSQS::EAction::ChangeMessageVisibilityBatch;
539+
} else if (Method == "ListDeadLetterSourceQueues") {
540+
action = NSQS::EAction::ListDeadLetterSourceQueues;
539541
}
540542

541543
requestHolder->SetRequestId(HttpContext.RequestId);
@@ -1075,6 +1077,7 @@ namespace NKikimr::NHttpProxy {
10751077
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch);
10761078
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch);
10771079
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch);
1080+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ListDeadLetterSourceQueues);
10781081
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
10791082
}
10801083

ydb/core/http_proxy/ut/datastreams_fixture.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
376376

377377
appConfig.MutableSqsConfig()->SetEnableSqs(true);
378378
appConfig.MutableSqsConfig()->SetYandexCloudMode(true);
379+
appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true);
379380

380381
auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
381382
limit->SetMinPeriodSeconds(0);

ydb/core/http_proxy/ut/http_proxy_ut.h

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,4 +1996,50 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
19961996

19971997
}
19981998

1999+
Y_UNIT_TEST_F(TestListDeadLetterSourceQueues, THttpProxyTestMock) {
2000+
auto createQueueReq = CreateSqsCreateQueueRequest();
2001+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
2002+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2003+
NJson::TJsonValue json;
2004+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2005+
2006+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
2007+
2008+
auto createDlqReq = CreateSqsCreateQueueRequest();
2009+
createQueueReq["QueueName"] = "DlqName";
2010+
res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
2011+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2012+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2013+
2014+
TString dlqUrl = GetByPath<TString>(json, "QueueUrl");
2015+
2016+
NJson::TJsonValue getQueueAttributes;
2017+
getQueueAttributes["QueueUrl"] = dlqUrl;
2018+
NJson::TJsonArray attributeNames = {"QueueArn"};
2019+
getQueueAttributes["AttributeNames"] = attributeNames;
2020+
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
2021+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2022+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2023+
2024+
TString dlqArn = GetByPath<TString>(json["Attributes"], "QueueArn");
2025+
2026+
NJson::TJsonValue setQueueAttributes;
2027+
setQueueAttributes["QueueUrl"] = resultQueueUrl;
2028+
NJson::TJsonValue attributes = {};
2029+
auto redrivePolicy = TStringBuilder()
2030+
<< "{\"deadLetterTargetArn\" : \"" << dlqArn << "\", \"maxReceiveCount\" : 100}";
2031+
attributes["RedrivePolicy"] = redrivePolicy;
2032+
setQueueAttributes["Attributes"] = attributes;
2033+
2034+
res = SendHttpRequest("/Root", "AmazonSQS.SetQueueAttributes", std::move(setQueueAttributes), FormAuthorizationStr("ru-central1"));
2035+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2036+
2037+
NJson::TJsonValue listDeadLetterSourceQueues;
2038+
listDeadLetterSourceQueues["QueueUrl"] = dlqUrl;
2039+
res = SendHttpRequest("/Root", "AmazonSQS.ListDeadLetterSourceQueues", std::move(listDeadLetterSourceQueues), FormAuthorizationStr("ru-central1"));
2040+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
2041+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
2042+
UNIT_ASSERT_VALUES_EQUAL(json["QueueUrls"][0], resultQueueUrl);
2043+
}
2044+
19992045
} // Y_UNIT_TEST_SUITE(TestHttpProxy)

ydb/public/api/grpc/draft/ydb_ymq_v1.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ service YmqService {
2424
rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse);
2525
rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse);
2626
rpc ChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse);
27+
rpc ListDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest) returns (ListDeadLetterSourceQueuesResponse);
2728
}

ydb/services/ymq/grpc_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger)
4646
ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off)
4747
ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off)
4848
ADD_REQUEST(ChangeMessageVisibilityBatch, DoYmqChangeMessageVisibilityBatchRequest, nullptr, Off)
49+
ADD_REQUEST(ListDeadLetterSourceQueues, DoYmqListDeadLetterSourceQueuesRequest, nullptr, Off)
4950

5051
#undef ADD_REQUEST
5152
}

ydb/services/ymq/ymq_proxy.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,9 +730,44 @@ namespace NKikimr::NYmq::V1 {
730730
private:
731731
NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
732732
auto result = requestHolder->MutableSetQueueAttributes();
733+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
733734
for (auto& [name, value]: GetProtoRequest()->Getattributes()) {
734735
AddAttribute(requestHolder, name, value);
735736
}
737+
return result;
738+
}
739+
};
740+
741+
class TListDeadLetterSourceQueuesReplyCallback : public TReplyCallback<
742+
NKikimr::NSQS::TListDeadLetterSourceQueuesResponse,
743+
Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult> {
744+
public:
745+
using TReplyCallback::TReplyCallback;
746+
747+
private:
748+
const NKikimr::NSQS::TListDeadLetterSourceQueuesResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override {
749+
return resp.GetListDeadLetterSourceQueues();
750+
}
751+
752+
Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult GetResult(const NKikimrClient::TSqsResponse& response) override {
753+
Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult result;
754+
for (const auto& queue : response.GetListDeadLetterSourceQueues().GetQueues()) {
755+
result.Mutablequeue_urls()->Add()->assign(queue.GetQueueUrl());
756+
}
757+
return result;
758+
}
759+
};
760+
761+
class TListDeadLetterSourceQueuesActor : public TRpcRequestActor<
762+
TEvYmqListDeadLetterSourceQueuesRequest,
763+
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest,
764+
TListDeadLetterSourceQueuesReplyCallback> {
765+
public:
766+
using TRpcRequestActor::TRpcRequestActor;
767+
768+
private:
769+
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
770+
auto result = requestHolder->MutableListDeadLetterSourceQueues();
736771
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
737772
return result;
738773
}
@@ -933,5 +968,6 @@ DECLARE_RPC(SetQueueAttributes);
933968
DECLARE_RPC(SendMessageBatch);
934969
DECLARE_RPC(DeleteMessageBatch);
935970
DECLARE_RPC(ChangeMessageVisibilityBatch);
971+
DECLARE_RPC(ListDeadLetterSourceQueues);
936972

937973
}

ydb/services/ymq/ymq_proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::
2727
using TEvYmqSendMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::SendMessageBatchRequest, Ydb::Ymq::V1::SendMessageBatchResponse>;
2828
using TEvYmqDeleteMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::DeleteMessageBatchRequest, Ydb::Ymq::V1::DeleteMessageBatchResponse>;
2929
using TEvYmqChangeMessageVisibilityBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::ChangeMessageVisibilityBatchRequest, Ydb::Ymq::V1::ChangeMessageVisibilityBatchResponse>;
30+
using TEvYmqListDeadLetterSourceQueuesRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::ListDeadLetterSourceQueuesRequest, Ydb::Ymq::V1::ListDeadLetterSourceQueuesResponse>;
3031

3132
}
3233
}

0 commit comments

Comments
 (0)