Skip to content

Commit f883501

Browse files
authored
Use runtime grpc event dispatching instead of legacy one for PQ scheme events (#1326)
1 parent 04c3f1c commit f883501

16 files changed

+269
-242
lines changed

ydb/core/grpc_services/base/base.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1437,7 +1437,12 @@ class TGrpcRequestCall
14371437
void Pass(const IFacilityProvider& facility) override {
14381438
this->Span_.End();
14391439

1440-
PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility);
1440+
try {
1441+
PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility);
1442+
} catch (const std::exception& ex) {
1443+
this->RaiseIssue(NYql::TIssue{TStringBuilder() << "unexpected exception: " << ex.what()});
1444+
this->ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR);
1445+
}
14411446
}
14421447

14431448
TRateLimiterMode GetRlMode() const override {

ydb/core/grpc_services/grpc_request_proxy.cpp

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -606,20 +606,8 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev) {
606606
HFunc(TEvStreamTopicDirectReadRequest, PreHandle);
607607
HFunc(TEvCommitOffsetRequest, PreHandle);
608608
HFunc(TEvPQReadInfoRequest, PreHandle);
609-
HFunc(TEvPQDropTopicRequest, PreHandle);
610-
HFunc(TEvPQCreateTopicRequest, PreHandle);
611-
HFunc(TEvPQAlterTopicRequest, PreHandle);
612-
HFunc(TEvPQAddReadRuleRequest, PreHandle);
613-
HFunc(TEvPQRemoveReadRuleRequest, PreHandle);
614-
HFunc(TEvPQDescribeTopicRequest, PreHandle);
615609
HFunc(TEvDiscoverPQClustersRequest, PreHandle);
616610
HFunc(TEvCoordinationSessionRequest, PreHandle);
617-
HFunc(TEvDropTopicRequest, PreHandle);
618-
HFunc(TEvCreateTopicRequest, PreHandle);
619-
HFunc(TEvAlterTopicRequest, PreHandle);
620-
HFunc(TEvDescribeTopicRequest, PreHandle);
621-
HFunc(TEvDescribeConsumerRequest, PreHandle);
622-
HFunc(TEvDescribePartitionRequest, PreHandle);
623611
HFunc(TEvNodeCheckRequest, PreHandle);
624612
HFunc(TEvProxyRuntimeEvent, PreHandle);
625613

ydb/core/grpc_services/grpc_request_proxy_handle_methods.h

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,10 @@ class TGRpcRequestProxyHandleMethods {
1515
static void Handle(TEvStreamTopicDirectReadRequest::TPtr& ev, const TActorContext& ctx);
1616
static void Handle(TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx);
1717
static void Handle(TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx);
18-
static void Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx);
19-
static void Handle(TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
20-
static void Handle(TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
21-
static void Handle(TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx);
22-
static void Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx);
23-
static void Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
2418
static void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx);
2519
static void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx);
2620
static void Handle(TEvNodeCheckRequest::TPtr& ev, const TActorContext& ctx);
2721
static void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx);
28-
static void Handle(TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx);
29-
static void Handle(TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
30-
static void Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
31-
static void Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
32-
static void Handle(TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx);
33-
static void Handle(TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx);
3422
};
3523

3624
}

ydb/core/grpc_services/rpc_calls.h

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@
1313
#include <ydb/public/api/protos/ydb_discovery.pb.h>
1414
#include <ydb/public/api/protos/ydb_monitoring.pb.h>
1515
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
16-
#include <ydb/public/api/protos/ydb_table.pb.h>
1716
#include <ydb/public/api/protos/ydb_persqueue_cluster_discovery.pb.h>
1817
#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h>
19-
#include <ydb/public/api/protos/ydb_topic.pb.h>
2018
#include <ydb/public/api/protos/ydb_federation_discovery.pb.h>
2119

2220
#include <ydb/public/api/grpc/draft/dummy.pb.h>
@@ -69,21 +67,7 @@ using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvSt
6967
using TEvStreamTopicDirectReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicDirectRead, Ydb::Topic::StreamDirectReadMessage::FromClient, Ydb::Topic::StreamDirectReadMessage::FromServer, TRateLimiterMode::RuManual>;
7068
using TEvCommitOffsetRequest = TGRpcRequestWrapper<TRpcServices::EvTopicCommitOffset, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse, true>;
7169
using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
72-
using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>;
73-
using TEvPQCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQCreateTopic, Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse, true>;
74-
using TEvPQAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAlterTopic, Ydb::PersQueue::V1::AlterTopicRequest, Ydb::PersQueue::V1::AlterTopicResponse, true>;
75-
using TEvPQDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDescribeTopic, Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse, true>;
76-
using TEvPQAddReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAddReadRule, Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse, true>;
77-
using TEvPQRemoveReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQRemoveReadRule, Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse, true>;
78-
7970
//TODO: Change this to runtime dispatching!
80-
using TEvDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDropTopic, Ydb::Topic::DropTopicRequest, Ydb::Topic::DropTopicResponse, true, TRateLimiterMode::Rps>;
81-
using TEvCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCreateTopic, Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse, true, TRateLimiterMode::Rps>;
82-
using TEvAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTopic, Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse, true, TRateLimiterMode::Rps>;
83-
using TEvDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeTopic, Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse, true, TRateLimiterMode::Rps>;
84-
using TEvDescribeConsumerRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeConsumer, Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse, true, TRateLimiterMode::Rps>;
85-
using TEvDescribePartitionRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribePartition, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse, true, TRateLimiterMode::Rps>;
86-
8771
using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>;
8872
using TEvListFederationDatabasesRequest = TGRpcRequestWrapper<TRpcServices::EvListFederationDatabases, Ydb::FederationDiscovery::ListFederationDatabasesRequest, Ydb::FederationDiscovery::ListFederationDatabasesResponse, true>;
8973

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#pragma once
2+
3+
#include <ydb/public/api/protos/ydb_topic.pb.h>
4+
#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h>
5+
6+
#include "rpc_calls.h"
7+
8+
namespace NKikimr::NGRpcService {
9+
10+
using TEvDropTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::DropTopicRequest, Ydb::Topic::DropTopicResponse>;
11+
using TEvCreateTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse>;
12+
using TEvAlterTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse>;
13+
using TEvDescribeTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse>;
14+
using TEvDescribeConsumerRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse>;
15+
using TEvDescribePartitionRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>;
16+
17+
using TEvPQDropTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse>;
18+
using TEvPQCreateTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse>;
19+
using TEvPQAlterTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::AlterTopicRequest, Ydb::PersQueue::V1::AlterTopicResponse>;
20+
using TEvPQDescribeTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse>;
21+
using TEvPQAddReadRuleRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse>;
22+
using TEvPQRemoveReadRuleRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse>;
23+
24+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#pragma once
2+
3+
#include <memory>
4+
5+
namespace NKikimr {
6+
7+
namespace NGRpcProxy::V1 {
8+
class IClustersCfgProvider;
9+
struct TClustersCfg;
10+
}
11+
12+
namespace NGRpcService {
13+
14+
class IRequestOpCtx;
15+
class IFacilityProvider;
16+
17+
void DoDropTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
18+
void DoCreateTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>);
19+
void DoAlterTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
20+
void DoDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
21+
void DoDescribeConsumerRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
22+
void DoDescribePartitionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
23+
24+
void DoPQDropTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
25+
void DoPQCreateTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>);
26+
void DoPQAlterTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>);
27+
void DoPQDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
28+
void DoPQAddReadRuleRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
29+
void DoPQRemoveReadRuleRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
30+
31+
}
32+
}

ydb/core/viewer/json_local_rpc.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
#include "json_pipe_req.h"
1111

1212
#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
13-
#include <ydb/core/grpc_services/rpc_calls.h>
13+
#include <ydb/core/grpc_services/rpc_calls_topic.h>
1414
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
1515
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
1616

ydb/services/persqueue_v1/actors/events.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include "partition_id.h"
44

55
#include <ydb/core/base/events.h>
6-
#include <ydb/core/grpc_services/rpc_calls.h>
6+
#include <ydb/core/grpc_services/rpc_calls_topic.h>
77
#include <ydb/core/protos/pqconfig.pb.h>
88
#include <ydb/core/persqueue/key.h>
99
#include <ydb/core/persqueue/percentile_counter.h>
@@ -15,7 +15,6 @@
1515

1616
#include <util/generic/guid.h>
1717

18-
1918
namespace NKikimr::NGRpcProxy::V1 {
2019

2120
using namespace Ydb;

0 commit comments

Comments
 (0)