Skip to content

Commit edc1471

Browse files
authored
endpoint slicing support (#10313)
This change allows to make logical slice of endpoints. Only endpoints with same endpointId will be used in discovery result for request which served via endpoint with endpointId This allows to create configuration where nodes have multiple endpoints in different networks. Changelog category New feature
1 parent 6e6f79a commit edc1471

File tree

18 files changed

+137
-10
lines changed

18 files changed

+137
-10
lines changed

ydb/core/discovery/discovery.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@ namespace NDiscovery {
4646
return false;
4747
}
4848

49+
bool CheckEndpointId(const TString& endpointId, const NKikimrStateStorage::TEndpointBoardEntry &entry) {
50+
if (endpointId.empty() && !entry.HasEndpointId())
51+
return true;
52+
53+
if (entry.HasEndpointId() && entry.GetEndpointId() == endpointId)
54+
return true;
55+
56+
return false;
57+
}
58+
4959
bool IsSafeLocationMarker(TStringBuf location) {
5060
const ui8* isrc = reinterpret_cast<const ui8*>(location.data());
5161
for (auto idx : xrange(location.size())) {
@@ -128,6 +138,7 @@ namespace NDiscovery {
128138
const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>& prevInfoEntries,
129139
TMap<TActorId, TEvStateStorage::TBoardInfoEntry> newInfoEntries,
130140
TSet<TString> services,
141+
TString endpointId,
131142
const THolder<TEvInterconnect::TEvNodeInfo>& nameserviceResponse) {
132143
TMap<TActorId, TEvStateStorage::TBoardInfoEntry> infoEntries;
133144
if (prevInfoEntries.empty()) {
@@ -170,6 +181,10 @@ namespace NDiscovery {
170181
continue;
171182
}
172183

184+
if (!CheckEndpointId(endpointId, entry)) {
185+
continue;
186+
}
187+
173188
if (entry.GetSsl()) {
174189
AddEndpoint(cachedMessageSsl, statesSsl, entry);
175190
} else {
@@ -264,7 +279,7 @@ namespace NDiscoveryPrivate {
264279

265280
currentCachedMessage = std::make_shared<NDiscovery::TCachedMessageData>(
266281
NDiscovery::CreateCachedMessage(
267-
currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, NameserviceResponse)
282+
currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, {}, NameserviceResponse)
268283
);
269284

270285
auto it = Requested.find(path);
@@ -278,7 +293,7 @@ namespace NDiscoveryPrivate {
278293
const auto& path = msg->Path;
279294

280295
auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>(
281-
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, NameserviceResponse)
296+
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, {}, NameserviceResponse)
282297
);
283298
newCachedData->Status = msg->Status;
284299

ydb/core/discovery/discovery.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ namespace NDiscovery {
5959
const TMap<TActorId, TEvStateStorage::TBoardInfoEntry>&,
6060
TMap<TActorId, TEvStateStorage::TBoardInfoEntry>,
6161
TSet<TString>,
62+
TString,
6263
const THolder<TEvInterconnect::TEvNodeInfo>&);
6364
}
6465

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1692,6 +1692,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
16921692
stringsFromProto(desc->AddressesV6, config.GetPublicAddressesV6());
16931693

16941694
desc->ServedServices.insert(desc->ServedServices.end(), config.GetServices().begin(), config.GetServices().end());
1695+
if (config.HasEndpointId()) {
1696+
desc->EndpointId = config.GetEndpointId();
1697+
}
16951698
endpoints.push_back(std::move(desc));
16961699
}
16971700

@@ -1706,6 +1709,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
17061709
desc->TargetNameOverride = config.GetPublicTargetNameOverride();
17071710

17081711
desc->ServedServices.insert(desc->ServedServices.end(), config.GetServices().begin(), config.GetServices().end());
1712+
if (config.HasEndpointId()) {
1713+
desc->EndpointId = config.GetEndpointId();
1714+
}
17091715
endpoints.push_back(std::move(desc));
17101716
}
17111717

@@ -1721,6 +1727,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
17211727
stringsFromProto(desc->AddressesV6, sx.GetPublicAddressesV6());
17221728

17231729
desc->ServedServices.insert(desc->ServedServices.end(), sx.GetServices().begin(), sx.GetServices().end());
1730+
if (sx.HasEndpointId()) {
1731+
desc->EndpointId = sx.GetEndpointId();
1732+
}
17241733
endpoints.push_back(std::move(desc));
17251734
}
17261735

@@ -1735,6 +1744,9 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
17351744
desc->TargetNameOverride = sx.GetPublicTargetNameOverride();
17361745

17371746
desc->ServedServices.insert(desc->ServedServices.end(), sx.GetServices().begin(), sx.GetServices().end());
1747+
if (sx.HasEndpointId()) {
1748+
desc->EndpointId = sx.GetEndpointId();
1749+
}
17381750
endpoints.push_back(std::move(desc));
17391751
}
17401752
}

ydb/core/driver_lib/run/run.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,10 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
940940
opts.SetKeepAliveEnable(false);
941941
}
942942

943+
if (grpcConfig.HasEndpointId()) {
944+
opts.SetEndpointId(grpcConfig.GetEndpointId());
945+
}
946+
943947
NConsole::SetGRpcLibraryFunction();
944948

945949
#define GET_PATH_TO_FILE(GRPC_CONFIG, PRIMARY_FIELD, SECONDARY_FIELD) \
@@ -981,6 +985,10 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
981985
if (ex.GetHost())
982986
xopts.SetHost(ex.GetHost());
983987

988+
if (ex.HasEndpointId()) {
989+
xopts.SetEndpointId(ex.GetEndpointId());
990+
}
991+
984992
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts) });
985993
fillFn(ex, *GRpcServers.back().second, xopts);
986994
}
@@ -989,6 +997,9 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
989997
NYdbGrpc::TServerOptions xopts = opts;
990998
xopts.SetPort(ex.GetSslPort());
991999

1000+
if (ex.HasEndpointId()) {
1001+
xopts.SetEndpointId(ex.GetEndpointId());
1002+
}
9921003

9931004
NYdbGrpc::TSslData sslData;
9941005

ydb/core/grpc_services/base/base.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1318,6 +1318,10 @@ class TGRpcRequestWrapperImpl
13181318
Ctx_->ReplyError(code, msg, details);
13191319
}
13201320

1321+
TString GetEndpointId() const {
1322+
return Ctx_->GetEndpointId();
1323+
}
1324+
13211325
private:
13221326
void Reply(NProtoBuf::Message *resp, ui32 status) override {
13231327
// End Of Request for non streaming requests

ydb/core/grpc_services/grpc_endpoint.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ struct TGrpcEndpointDescription : public TThrRefBase {
1717

1818
TVector<TString> ServedServices;
1919
TVector<TString> ServedDatabases;
20+
TString EndpointId;
2021
};
2122

2223
IActor* CreateGrpcEndpointPublishActor(TGrpcEndpointDescription *description);

ydb/core/grpc_services/grpc_endpoint_publish_actor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ class TGRpcEndpointPublishActor : public TActorBootstrapped<TGRpcEndpointPublish
4848
if (Description->TargetNameOverride) {
4949
entry.SetTargetNameOverride(Description->TargetNameOverride);
5050
}
51+
if (Description->EndpointId) {
52+
entry.SetEndpointId(Description->EndpointId);
53+
}
5154
for (const auto &service : Description->ServedServices)
5255
entry.AddServices(service);
5356

ydb/core/grpc_services/local_grpc/local_grpc.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class TContextBase : public NYdbGrpc::IRequestContextBase {
5454
return GRPC_COMPRESS_LEVEL_NONE;
5555
}
5656

57+
TString GetEndpointId() const override { return {}; }
58+
5759
google::protobuf::Arena* GetArena() override {
5860
return &Arena_;
5961
}

ydb/core/grpc_services/rpc_discovery.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,16 @@ class TListEndpointsRPC : public TActorBootstrapped<TListEndpointsRPC> {
134134

135135
TString cachedMessage, cachedMessageSsl;
136136

137-
if (services.empty() && !LookupResponse->CachedMessageData->CachedMessage.empty() &&
137+
TString endpointId = Request->GetEndpointId();
138+
139+
if (endpointId.empty() && services.empty() && !LookupResponse->CachedMessageData->CachedMessage.empty() &&
138140
!LookupResponse->CachedMessageData->CachedMessageSsl.empty()) {
139141
cachedMessage = LookupResponse->CachedMessageData->CachedMessage;
140142
cachedMessageSsl = LookupResponse->CachedMessageData->CachedMessageSsl;
141143
} else {
142144
auto cachedMessageData = NDiscovery::CreateCachedMessage(
143145
{}, std::move(LookupResponse->CachedMessageData->InfoEntries),
144-
std::move(services), NameserviceResponse);
146+
std::move(services), std::move(endpointId), NameserviceResponse);
145147
cachedMessage = std::move(cachedMessageData.CachedMessage);
146148
cachedMessageSsl = std::move(cachedMessageData.CachedMessageSsl);
147149
}

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,7 @@ message TGRpcConfig {
707707

708708
optional uint32 GRpcProxyCount = 106 [default = 2];
709709
optional bool EnableGRpcMemoryQuota = 107 [default = false];
710+
optional string EndpointId = 108;
710711

711712
repeated TGRpcConfig ExtEndpoints = 200; // run specific services on separate endpoints
712713
}

ydb/core/protos/statestorage.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,5 +137,6 @@ message TEndpointBoardEntry {
137137
repeated string AddressesV4 = 8;
138138
repeated string AddressesV6 = 9;
139139
optional string TargetNameOverride = 10;
140+
optional string EndpointId = 11;
140141
};
141142

ydb/core/public_http/grpc_request_context_wrapper.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,6 @@ namespace NKikimr::NPublicHttp {
7878
return RequestContext.GetPeer();
7979
}
8080

81+
TString TGrpcRequestContextWrapper::GetEndpointId() const { return {}; }
82+
8183
} // namespace NKikimr::NPublicHttp

ydb/core/public_http/grpc_request_context_wrapper.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase {
3434
virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const;
3535
virtual TVector<TStringBuf> FindClientCert() const {return {};}
3636
virtual grpc_compression_level GetCompressionLevel() const { return GRPC_COMPRESS_LEVEL_NONE; }
37+
virtual TString GetEndpointId() const;
3738

3839
virtual google::protobuf::Arena* GetArena();
3940

ydb/library/grpc/server/grpc_request.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ class TGRpcRequestImpl
185185
return TBaseAsyncContext<TService>::GetCompressionLevel();
186186
}
187187

188+
TString GetEndpointId() const override {
189+
return Server_->GetEndpointId();
190+
}
191+
188192
//! Get pointer to the request's message.
189193
const NProtoBuf::Message* GetRequest() const override {
190194
return Request_;

ydb/library/grpc/server/grpc_request_base.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ class IRequestContextBase: public TThrRefBase {
125125

126126
//! Returns true if client was not interested in result (but we still must send response to make grpc happy)
127127
virtual bool IsClientLost() const = 0;
128+
129+
virtual TString GetEndpointId() const = 0;
128130
};
129131

130132
} // namespace NYdbGrpc

ydb/library/grpc/server/grpc_server.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ struct TServerOptions {
108108
//! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).
109109
DECLARE_FIELD(Logger, TLoggerPtr, nullptr);
110110

111+
DECLARE_FIELD(EndpointId, TString, "");
112+
111113
#undef DECLARE_FIELD
112114
};
113115

@@ -203,6 +205,8 @@ class IGRpcService: public TThrRefBase {
203205
* service to inspect server options and initialize accordingly.
204206
*/
205207
virtual void SetServerOptions(const TServerOptions& options) = 0;
208+
209+
virtual TString GetEndpointId() const = 0;
206210
};
207211

208212
class TGrpcServiceProtectiable: public IGRpcService {
@@ -282,6 +286,11 @@ class TGrpcServiceProtectiable: public IGRpcService {
282286
void SetServerOptions(const TServerOptions& options) override {
283287
SslServer_ = bool(options.SslData);
284288
NeedAuth_ = options.UseAuth;
289+
EndpointId_ = options.EndpointId;
290+
}
291+
292+
TString GetEndpointId() const override {
293+
return EndpointId_;
285294
}
286295

287296
//! Check if the server is going to shut down.
@@ -303,6 +312,7 @@ class TGrpcServiceProtectiable: public IGRpcService {
303312

304313
bool SslServer_ = false;
305314
bool NeedAuth_ = false;
315+
TString EndpointId_;
306316

307317
struct TShard {
308318
TAdaptiveLock Lock_;

ydb/tests/functional/api/test_discovery.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ class TestDiscoveryExtEndpoint(object):
1919
@classmethod
2020
def setup_class(cls):
2121
conf = KikimrConfigGenerator()
22-
cls.ext_port = conf.port_allocator.get_node_port_allocator(0).ext_port
23-
conf.clone_grpc_as_ext_endpoint(cls.ext_port)
22+
cls.ext_port_1 = conf.port_allocator.get_node_port_allocator(0).ext_port
23+
cls.ext_port_2 = conf.port_allocator.get_node_port_allocator(1).ext_port
24+
conf.clone_grpc_as_ext_endpoint(cls.ext_port_1, "extserv1")
25+
conf.clone_grpc_as_ext_endpoint(cls.ext_port_2, "extserv2")
2426
cls.cluster = kikimr_cluster_factory(
2527
configurator=conf
2628
)
@@ -42,18 +44,68 @@ def teardown_class(cls):
4244
cls.cluster.stop()
4345

4446
def test_scenario(self):
45-
ext_port = TestDiscoveryExtEndpoint.ext_port
47+
ext_port_1 = TestDiscoveryExtEndpoint.ext_port_1
48+
ext_port_2 = TestDiscoveryExtEndpoint.ext_port_2
4649
driver_config = ydb.DriverConfig(
47-
"%s:%s" % (self.cluster.nodes[1].host, ext_port), self.database_name)
50+
"%s:%s" % (self.cluster.nodes[1].host, self.cluster.nodes[1].port), self.database_name)
4851
resolver = ydb.DiscoveryEndpointsResolver(driver_config)
4952
driver = ydb.Driver(driver_config)
5053
driver.wait(timeout=10)
5154

5255
endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints]
56+
# Discovery has been performed using default endpoint
57+
# but ext endpoint marked with label
58+
# such ext endpoint should not present in discovery
59+
assert_that(ext_port_1 not in endpoint_ports)
60+
assert_that(ext_port_2 not in endpoint_ports)
5361

5462
for slot in self.cluster.slots.values():
5563
assert_that(slot.grpc_port in endpoint_ports)
56-
assert_that(slot.grpc_port != ext_port)
64+
assert_that(slot.grpc_port != ext_port_1)
65+
assert_that(slot.grpc_port != ext_port_2)
66+
67+
driver_config = ydb.DriverConfig(
68+
"%s:%s" % (self.cluster.nodes[1].host, ext_port_1), self.database_name)
69+
resolver = ydb.DiscoveryEndpointsResolver(driver_config)
70+
driver = ydb.Driver(driver_config)
71+
driver.wait(timeout=10)
72+
73+
endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints]
74+
# Discovery has been performed using external endpoint with label
75+
# only endpoint with such label expected
76+
assert_that(ext_port_1 in endpoint_ports)
77+
assert_that(ext_port_2 not in endpoint_ports)
78+
79+
for slot in self.cluster.slots.values():
80+
assert_that(slot.grpc_port not in endpoint_ports)
81+
82+
# Just check again to cover discovery cache issue
83+
driver_config = ydb.DriverConfig(
84+
"%s:%s" % (self.cluster.nodes[1].host, ext_port_1), self.database_name)
85+
resolver = ydb.DiscoveryEndpointsResolver(driver_config)
86+
driver = ydb.Driver(driver_config)
87+
driver.wait(timeout=10)
88+
89+
endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints]
90+
assert_that(ext_port_1 in endpoint_ports)
91+
assert_that(ext_port_2 not in endpoint_ports)
92+
93+
for slot in self.cluster.slots.values():
94+
assert_that(slot.grpc_port not in endpoint_ports)
95+
96+
# Repeat using other ext endpoint
97+
driver_config = ydb.DriverConfig(
98+
"%s:%s" % (self.cluster.nodes[1].host, ext_port_2), self.database_name)
99+
resolver = ydb.DiscoveryEndpointsResolver(driver_config)
100+
driver = ydb.Driver(driver_config)
101+
driver.wait(timeout=10)
102+
103+
endpoint_ports = [endpoint.port for endpoint in resolver.resolve().endpoints]
104+
assert_that(ext_port_1 not in endpoint_ports)
105+
assert_that(ext_port_2 in endpoint_ports)
106+
107+
for slot in self.cluster.slots.values():
108+
assert_that(slot.grpc_port not in endpoint_ports)
57109

58110

59111
@six.add_metaclass(abc.ABCMeta)

ydb/tests/library/harness/kikimr_config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,13 +546,16 @@ def write_proto_configs(self, configs_path):
546546
with open(os.path.join(configs_path, "config.yaml"), "w") as writer:
547547
writer.write(yaml.safe_dump(self.yaml_config))
548548

549-
def clone_grpc_as_ext_endpoint(self, port):
549+
def clone_grpc_as_ext_endpoint(self, port, endpoint_id=None):
550550
cur_grpc_config = copy.deepcopy(self.yaml_config['grpc_config'])
551551
if 'ext_endpoints' in cur_grpc_config:
552552
del cur_grpc_config['ext_endpoints']
553553

554554
cur_grpc_config['port'] = port
555555

556+
if endpoint_id is not None:
557+
cur_grpc_config['endpoint_id'] = endpoint_id
558+
556559
if 'ext_endpoints' not in self.yaml_config['grpc_config']:
557560
self.yaml_config['grpc_config']['ext_endpoints'] = []
558561

0 commit comments

Comments
 (0)