Skip to content

Commit a4fbf9d

Browse files
committed
Refactor grpc socket mutator
1 parent 49dd99a commit a4fbf9d

File tree

3 files changed

+114
-84
lines changed

3 files changed

+114
-84
lines changed
Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
_ydb_sdk_add_library(grpc-client)
22

3-
target_link_libraries(grpc-client PUBLIC
4-
yutil
5-
gRPC::grpc++
3+
target_link_libraries(grpc-client
4+
PUBLIC
5+
yutil
6+
gRPC::grpc++
67
)
78

89
target_sources(grpc-client PRIVATE
910
grpc_client_low.cpp
1011
)
1112

13+
target_compile_definitions(grpc-client PRIVATE YDB_DISABLE_GRPC_SOCKET_MUTATOR)
14+
1215
_ydb_sdk_install_targets(TARGETS grpc-client)

src/library/grpc/client/grpc_client_low.cpp

Lines changed: 97 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
#include <netinet/tcp.h>
1515
#endif
1616

17+
#if !defined(YDB_DISABLE_GRPC_SOCKET_MUTATOR)
18+
#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
19+
#endif
20+
21+
#include <format>
22+
1723
namespace NYdbGrpc {
1824

1925
void EnableGRpcTracing() {
@@ -30,77 +36,79 @@ void EnableGRpcTracing() {
3036
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
3137
}
3238

33-
// class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
34-
// public:
35-
// TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
36-
// : Idle_(idle)
37-
// , Count_(count)
38-
// , Interval_(interval)
39-
// {
40-
// grpc_socket_mutator_init(this, &VTable);
41-
// }
42-
// private:
43-
// static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
44-
// return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
45-
// }
46-
47-
// template<typename TVal>
48-
// bool SetOption(int fd, int level, int optname, const TVal& value) {
49-
// return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
50-
// }
51-
// bool SetOption(int fd) {
52-
// if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
53-
// std::cerr << std::format("Failed to set SO_KEEPALIVE option: {}", strerror(errno)) << std::endl;
54-
// return false;
55-
// }
56-
// #ifdef _linux_
57-
// if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
58-
// std::cerr << std::format("Failed to set TCP_KEEPIDLE option: {}", strerror(errno)) << std::endl;
59-
// return false;
60-
// }
61-
// if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
62-
// std::cerr << std::format("Failed to set TCP_KEEPCNT option: {}", strerror(errno)) << std::endl;
63-
// return false;
64-
// }
65-
// if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
66-
// std::cerr << std::format("Failed to set TCP_KEEPINTVL option: {}", strerror(errno)) << std::endl;
67-
// return false;
68-
// }
69-
// #endif
70-
// return true;
71-
// }
72-
// static bool Mutate(int fd, grpc_socket_mutator* mutator) {
73-
// auto self = Cast(mutator);
74-
// return self->SetOption(fd);
75-
// }
76-
// static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
77-
// const auto* selfA = Cast(a);
78-
// const auto* selfB = Cast(b);
79-
// auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
80-
// auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
81-
// return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
82-
// }
83-
// static void Destroy(grpc_socket_mutator* mutator) {
84-
// delete Cast(mutator);
85-
// }
86-
// static bool Mutate2(const grpc_mutate_socket_info* info, grpc_socket_mutator* mutator) {
87-
// auto self = Cast(mutator);
88-
// return self->SetOption(info->fd);
89-
// }
90-
91-
// static grpc_socket_mutator_vtable VTable;
92-
// const int Idle_;
93-
// const int Count_;
94-
// const int Interval_;
95-
// };
96-
97-
// grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
98-
// {
99-
// &TGRpcKeepAliveSocketMutator::Mutate,
100-
// &TGRpcKeepAliveSocketMutator::Compare,
101-
// &TGRpcKeepAliveSocketMutator::Destroy,
102-
// &TGRpcKeepAliveSocketMutator::Mutate2
103-
// };
39+
#if !defined(YDB_DISABLE_GRPC_SOCKET_MUTATOR)
40+
class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
41+
public:
42+
TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
43+
: Idle_(idle)
44+
, Count_(count)
45+
, Interval_(interval)
46+
{
47+
grpc_socket_mutator_init(this, &VTable);
48+
}
49+
private:
50+
static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
51+
return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
52+
}
53+
54+
template<typename TVal>
55+
bool SetOption(int fd, int level, int optname, const TVal& value) {
56+
return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
57+
}
58+
bool SetOption(int fd) {
59+
if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
60+
std::cerr << std::format("Failed to set SO_KEEPALIVE option: {}", strerror(errno)) << std::endl;
61+
return false;
62+
}
63+
#ifdef _linux_
64+
if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
65+
std::cerr << std::format("Failed to set TCP_KEEPIDLE option: {}", strerror(errno)) << std::endl;
66+
return false;
67+
}
68+
if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
69+
std::cerr << std::format("Failed to set TCP_KEEPCNT option: {}", strerror(errno)) << std::endl;
70+
return false;
71+
}
72+
if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
73+
std::cerr << std::format("Failed to set TCP_KEEPINTVL option: {}", strerror(errno)) << std::endl;
74+
return false;
75+
}
76+
#endif
77+
return true;
78+
}
79+
static bool Mutate(int fd, grpc_socket_mutator* mutator) {
80+
auto self = Cast(mutator);
81+
return self->SetOption(fd);
82+
}
83+
static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
84+
const auto* selfA = Cast(a);
85+
const auto* selfB = Cast(b);
86+
auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
87+
auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
88+
return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
89+
}
90+
static void Destroy(grpc_socket_mutator* mutator) {
91+
delete Cast(mutator);
92+
}
93+
static bool Mutate2(const grpc_mutate_socket_info* info, grpc_socket_mutator* mutator) {
94+
auto self = Cast(mutator);
95+
return self->SetOption(info->fd);
96+
}
97+
98+
static grpc_socket_mutator_vtable VTable;
99+
const int Idle_;
100+
const int Count_;
101+
const int Interval_;
102+
};
103+
104+
grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
105+
{
106+
&TGRpcKeepAliveSocketMutator::Mutate,
107+
&TGRpcKeepAliveSocketMutator::Compare,
108+
&TGRpcKeepAliveSocketMutator::Destroy,
109+
&TGRpcKeepAliveSocketMutator::Mutate2
110+
};
111+
#endif
104112

105113
TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
106114
: TcpKeepAliveSettings_(tcpKeepAliveSettings)
@@ -140,16 +148,9 @@ void TChannelPool::GetStubsHolderLocked(
140148
}
141149
}
142150
}
143-
// TGRpcKeepAliveSocketMutator* mutator = nullptr;
144-
// // will be destroyed inside grpc
145-
// if (TcpKeepAliveSettings_.Enabled) {
146-
// mutator = new TGRpcKeepAliveSocketMutator(
147-
// TcpKeepAliveSettings_.Idle,
148-
// TcpKeepAliveSettings_.Count,
149-
// TcpKeepAliveSettings_.Interval
150-
// );
151-
// }
152-
cb(Pool_.emplace(channelId, CreateChannelInterface(config, nullptr)).first->second);
151+
auto mutator = NImpl::CreateGRpcKeepAliveSocketMutator(TcpKeepAliveSettings_);
152+
// will be destroyed inside grpc
153+
cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
153154
LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
154155
}
155156
}
@@ -587,4 +588,19 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {
587588
}
588589
}
589590

591+
grpc_socket_mutator* NImpl::CreateGRpcKeepAliveSocketMutator(const TTcpKeepAliveSettings& TcpKeepAliveSettings_) {
592+
#if !defined(YDB_DISABLE_GRPC_SOCKET_MUTATOR)
593+
TGRpcKeepAliveSocketMutator* mutator = nullptr;
594+
if (TcpKeepAliveSettings_.Enabled) {
595+
mutator = new TGRpcKeepAliveSocketMutator(
596+
TcpKeepAliveSettings_.Idle,
597+
TcpKeepAliveSettings_.Count,
598+
TcpKeepAliveSettings_.Interval
599+
);
600+
}
601+
return mutator;
602+
#endif
603+
return nullptr;
604+
}
605+
590606
} // namespace NGRpc

src/library/grpc/client/grpc_client_low.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,10 @@ class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TRes
452452

453453
class TGRpcKeepAliveSocketMutator;
454454

455+
namespace NImpl {
456+
grpc_socket_mutator* CreateGRpcKeepAliveSocketMutator(const TTcpKeepAliveSettings& TcpKeepAliveSettings_);
457+
} // NImpl
458+
455459
// Class to hold stubs allocated on channel.
456460
// It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit
457461

@@ -1387,6 +1391,13 @@ class TGRpcClientLow
13871391
return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this));
13881392
}
13891393

1394+
template<typename TGRpcService>
1395+
std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config, const TTcpKeepAliveSettings& keepAlive) {
1396+
auto mutator = NImpl::CreateGRpcKeepAliveSocketMutator(keepAlive);
1397+
// will be destroyed inside grpc
1398+
return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config, mutator), this));
1399+
}
1400+
13901401
template<typename TGRpcService>
13911402
std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) {
13921403
return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this));

0 commit comments

Comments
 (0)