Skip to content

Commit ee039ff

Browse files
dcherednikGazizonoki
authored andcommitted
Moved commit "[C++ SDK] Remove session from session pool if stream session was closed by server side." from ydb repo
1 parent e86c4bc commit ee039ff

File tree

7 files changed

+175
-10
lines changed

7 files changed

+175
-10
lines changed

src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ TKqpSessionCommon::TKqpSessionCommon(
3838
, State_(S_STANDALONE)
3939
, TimeToTouch_(TInstant::Now())
4040
, TimeInPast_(TInstant::Now())
41+
, CloseHandler_(nullptr)
4142
, NeedUpdateActiveCounter_(false)
4243
{}
4344

@@ -114,7 +115,7 @@ void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval,
114115
if (updateTimeInPast) {
115116
TimeInPast_ = now;
116117
}
117-
TimeToTouch_ = now + interval;
118+
TimeToTouch_.store(now + interval, std::memory_order_relaxed);
118119
}
119120

120121
void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval,
@@ -124,11 +125,11 @@ void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval,
124125
if (updateTimeInPast) {
125126
TimeInPast_ = now;
126127
}
127-
TimeToTouch_ = now + interval;
128+
TimeToTouch_.store(now + interval, std::memory_order_relaxed);
128129
}
129130

130131
TInstant TKqpSessionCommon::GetTimeToTouchFast() const {
131-
return TimeToTouch_;
132+
return TimeToTouch_.load(std::memory_order_relaxed);
132133
}
133134

134135
TInstant TKqpSessionCommon::GetTimeInPastFast() const {
@@ -144,6 +145,24 @@ TDuration TKqpSessionCommon::GetTimeInterval() const {
144145
return TimeInterval_;
145146
}
146147

148+
void TKqpSessionCommon::UpdateServerCloseHandler(IServerCloseHandler* handler) {
149+
CloseHandler_.store(handler);
150+
}
151+
152+
void TKqpSessionCommon::CloseFromServer(std::weak_ptr<ISessionClient> client) noexcept {
153+
auto strong = client.lock();
154+
if (!strong) {
155+
// Session closed on the server after stopping client - do nothing
156+
// moreover pool maybe destoyed now
157+
return;
158+
}
159+
160+
IServerCloseHandler* h = CloseHandler_.load();
161+
if (h) {
162+
h->OnCloseSession(this, strong);
163+
}
164+
}
165+
147166
////////////////////////////////////////////////////////////////////////////////
148167

149168
std::function<void(TKqpSessionCommon*)> TKqpSessionCommon::GetSmartDeleter(

src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@ namespace NYdb::inline V3 {
1313
////////////////////////////////////////////////////////////////////////////////
1414
ui64 GetNodeIdFromSession(const std::string& sessionId);
1515

16+
class TKqpSessionCommon;
17+
18+
class IServerCloseHandler {
19+
public:
20+
virtual ~IServerCloseHandler() = default;
21+
// called when session should be closed by server signal
22+
virtual void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr<ISessionClient>) = 0;
23+
};
24+
1625
class TKqpSessionCommon : public TEndpointObj {
1726
public:
1827
TKqpSessionCommon(const std::string& sessionId, const std::string& endpoint,
@@ -54,6 +63,12 @@ class TKqpSessionCommon : public TEndpointObj {
5463
static std::function<void(TKqpSessionCommon*)>
5564
GetSmartDeleter(std::shared_ptr<ISessionClient> client);
5665

66+
// Shoult be called under session pool lock
67+
void UpdateServerCloseHandler(IServerCloseHandler*);
68+
69+
// Called asynchronously from grpc thread.
70+
void CloseFromServer(std::weak_ptr<ISessionClient> client) noexcept;
71+
5772
protected:
5873
TAdaptiveLock Lock_;
5974

@@ -63,10 +78,14 @@ class TKqpSessionCommon : public TEndpointObj {
6378
const bool IsOwnedBySessionPool_;
6479

6580
EState State_;
66-
TInstant TimeToTouch_;
81+
// This time is used during async close session handling which does not lock the session
82+
// so we need to be able to read this value atomicaly
83+
std::atomic<TInstant> TimeToTouch_;
6784
TInstant TimeInPast_;
6885
// Is used to implement progressive timeout for settler keep alive call
6986
TDuration TimeInterval_;
87+
88+
std::atomic<IServerCloseHandler*> CloseHandler_;
7089
// Indicate session was in active state, but state was changed
7190
// (need to decrement active session counter)
7291
// TODO: suboptimal because need lock for atomic change from interceptor

src/client/impl/ydb_internal/session_pool/session_pool.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ void TSessionPool::GetSession(std::unique_ptr<IGetSessionCtx> ctx)
141141
}
142142
if (!Sessions_.empty()) {
143143
auto it = std::prev(Sessions_.end());
144+
it->second->UpdateServerCloseHandler(nullptr);
144145
sessionImpl = std::move(it->second);
145146
Sessions_.erase(it);
146147
}
@@ -206,6 +207,7 @@ bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active) {
206207
if (!active)
207208
IncrementActiveCounterUnsafe();
208209
} else {
210+
impl->UpdateServerCloseHandler(this);
209211
Sessions_.emplace(std::make_pair(
210212
impl->GetTimeToTouchFast(),
211213
impl));
@@ -242,6 +244,7 @@ void TSessionPool::Drain(std::function<bool(std::unique_ptr<TKqpSessionCommon>&&
242244
std::lock_guard guard(Mtx_);
243245
Closed_ = close;
244246
for (auto it = Sessions_.begin(); it != Sessions_.end();) {
247+
it->second->UpdateServerCloseHandler(nullptr);
245248
const bool cont = cb(std::move(it->second));
246249
it = Sessions_.erase(it);
247250
if (!cont)
@@ -283,9 +286,11 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<ISessionClient> weakC
283286
break;
284287

285288
if (deletePredicate(it->second.get(), sessions.size())) {
289+
it->second->UpdateServerCloseHandler(nullptr);
286290
sessionsToDelete.emplace_back(std::move(it->second));
287291
sessions.erase(it++);
288292
} else if (cmd) {
293+
it->second->UpdateServerCloseHandler(nullptr);
289294
sessionsToTouch.emplace_back(std::move(it->second));
290295
sessions.erase(it++);
291296
} else {
@@ -338,6 +343,32 @@ i64 TSessionPool::GetCurrentPoolSize() const {
338343
return Sessions_.size();
339344
}
340345

346+
void TSessionPool::OnCloseSession(const TKqpSessionCommon* s, std::shared_ptr<ISessionClient> client) {
347+
std::unique_ptr<TKqpSessionCommon> session;
348+
{
349+
std::lock_guard guard(Mtx_);
350+
const auto timeToTouch = s->GetTimeToTouchFast();
351+
const auto id = s->GetId();
352+
auto it = Sessions_.find(timeToTouch);
353+
// Sessions_ is multimap of sessions sorted by scheduled time to run periodic task
354+
// Scan sessions with same scheduled time to find needed one. In most cases only one session here
355+
while (it != Sessions_.end() && it->first == timeToTouch) {
356+
if (id != it->second->GetId()) {
357+
it++;
358+
continue;
359+
}
360+
session = std::move(it->second);
361+
Sessions_.erase(it);
362+
break;
363+
}
364+
}
365+
366+
if (session) {
367+
Y_ABORT_UNLESS(session->GetState() == TKqpSessionCommon::S_IDLE);
368+
CloseAndDeleteSession(std::move(session), client);
369+
}
370+
}
371+
341372
void TSessionPool::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) {
342373
ActiveSessionsCounter_.Set(statCollector.ActiveSessions);
343374
InPoolSessionsCounter_.Set(statCollector.InPoolSessions);

src/client/impl/ydb_internal/session_pool/session_pool.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ NThreading::TFuture<TResponse> InjectSessionStatusInterception(
8282
return promise.GetFuture();
8383
}
8484

85-
class TSessionPool {
85+
class TSessionPool : public IServerCloseHandler {
8686
private:
8787
class TWaitersQueue {
8888
public:
@@ -125,6 +125,8 @@ class TSessionPool {
125125
void Drain(std::function<bool(std::unique_ptr<TKqpSessionCommon>&&)> cb, bool close);
126126
void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector);
127127

128+
void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr<ISessionClient> client) override;
129+
128130
private:
129131
void UpdateStats();
130132
static void ReplySessionToUser(TKqpSessionCommon* session, std::unique_ptr<IGetSessionCtx> ctx);

src/client/query/client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public
344344
const auto sessionId = resp->session_id();
345345
request.set_session_id(sessionId);
346346

347-
auto args = std::make_shared<TSession::TImpl::TAttachSessionArgs>(promise, sessionId, endpoint, client);
347+
auto args = std::make_shared<TSession::TImpl::TAttachSessionArgs>(promise, sessionId, endpoint, client, client);
348348

349349
// Do not pass client timeout here. Session must be alive
350350
TRpcRequestSettings rpcSettings;

src/client/query/impl/client_session.cpp

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,106 @@
55
#undef INCLUDE_YDB_INTERNAL_H
66

77
#include <src/library/issue/yql_issue_message.h>
8+
#include <thread>
89

910
namespace NYdb::inline V3::NQuery {
1011

11-
TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const std::string& sessionId, const std::string& endpoint)
12+
// Custom lock primitive to protect session from destroying
13+
// during async read execution.
14+
// The problem is TSession::TImpl holds grpc stream processor by IntrusivePtr
15+
// and this processor alredy refcounted by internal code.
16+
// That mean during TSession::TImpl dtor no gurantee to grpc procerrot will be destroyed.
17+
// StreamProcessor_->Cancel() doesn't help it just start async cancelation but we have no way
18+
// to wait cancelation has done.
19+
// So we need some way to protect access to row session impl pointer
20+
// from async reader (processor callback). We can't use shared/weak ptr here because TSessionImpl
21+
// stores as uniq ptr inside session pool and as shared ptr in the TSession
22+
// when user got session (see GetSmartDeleter related code).
23+
24+
// Why just not std::mutex? - Requirement do not destroy a mutex while it is locked
25+
// makes it difficult to use here. Moreover we need to allow recursive lock.
26+
27+
// Why recursive lock? - In happy path we destroy session from CloseFromServer call,
28+
// so the session dtor called from thread which already got the lock.
29+
30+
// TODO: Proably we can add sync version of Cancel method in to grpc stream procesor to make sure
31+
// no more callback will be called.
32+
33+
class TSafeTSessionImplHolder {
34+
TSession::TImpl* Ptr;
35+
std::atomic_uint32_t Semaphore;
36+
std::atomic<std::thread::id> OwnerThread;
37+
public:
38+
TSafeTSessionImplHolder(TSession::TImpl* p)
39+
: Ptr(p)
40+
, Semaphore(0)
41+
{}
42+
43+
TSession::TImpl* TrySharedOwning() noexcept {
44+
auto old = Semaphore.fetch_add(1);
45+
if (old == 0) {
46+
OwnerThread.store(std::this_thread::get_id());
47+
return Ptr;
48+
} else {
49+
return nullptr;
50+
}
51+
}
52+
53+
void Release() noexcept {
54+
OwnerThread.store(std::thread::id());
55+
Semaphore.store(0);
56+
}
57+
58+
void WaitAndLock() noexcept {
59+
if (OwnerThread.load() == std::this_thread::get_id()) {
60+
return;
61+
}
62+
63+
uint32_t cur = 0;
64+
uint32_t newVal = 1;
65+
while (!Semaphore.compare_exchange_weak(cur, newVal,
66+
std::memory_order_release, std::memory_order_relaxed)) {
67+
std::this_thread::yield();
68+
cur = 0;
69+
}
70+
}
71+
};
72+
73+
void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr<ISessionClient> client,
74+
std::shared_ptr<TSafeTSessionImplHolder> holder)
75+
{
76+
auto resp = std::make_shared<Ydb::Query::SessionState>();
77+
ptr->Read(resp.get(), [resp, ptr, client, holder](NYdbGrpc::TGrpcStatus grpcStatus) mutable {
78+
switch (grpcStatus.GRpcStatusCode) {
79+
case grpc::StatusCode::OK:
80+
StartAsyncRead(ptr, client, holder);
81+
break;
82+
case grpc::StatusCode::OUT_OF_RANGE: {
83+
auto impl = holder->TrySharedOwning();
84+
if (impl) {
85+
impl->CloseFromServer(client);
86+
holder->Release();
87+
}
88+
break;
89+
}
90+
}
91+
});
92+
}
93+
94+
TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const std::string& sessionId, const std::string& endpoint, std::weak_ptr<ISessionClient> client)
1295
: TKqpSessionCommon(sessionId, endpoint, true)
1396
, StreamProcessor_(ptr)
97+
, SessionHolder(std::make_shared<TSafeTSessionImplHolder>(this))
1498
{
1599
MarkActive();
16100
SetNeedUpdateActiveCounter(true);
101+
StartAsyncRead(StreamProcessor_, client, SessionHolder);
17102
}
18103

19104
TSession::TImpl::~TImpl()
20105
{
21106
StreamProcessor_->Cancel();
107+
SessionHolder->WaitAndLock();
22108
}
23109

24110
void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr,
@@ -53,7 +139,7 @@ void TSession::TImpl::NewSmartShared(TStreamProcessorPtr ptr,
53139
std::move(st),
54140
TSession(
55141
args->Client,
56-
new TSession::TImpl(ptr, args->SessionId, args->Endpoint)
142+
new TSession::TImpl(ptr, args->SessionId, args->Endpoint, args->SessionClient)
57143
)
58144
)
59145
);

src/client/query/impl/client_session.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,36 +7,44 @@
77

88
namespace NYdb::inline V3::NQuery {
99

10+
class TSafeTSessionImplHolder;
11+
1012
class TSession::TImpl : public TKqpSessionCommon {
1113
public:
1214
struct TAttachSessionArgs {
1315
TAttachSessionArgs(NThreading::TPromise<TCreateSessionResult> promise,
1416
std::string sessionId,
1517
std::string endpoint,
16-
std::shared_ptr<TQueryClient::TImpl> client)
18+
std::shared_ptr<TQueryClient::TImpl> client,
19+
std::weak_ptr<ISessionClient> sessionClient)
1720
: Promise(promise)
1821
, SessionId(sessionId)
1922
, Endpoint(endpoint)
2023
, Client(client)
24+
, SessionClient(sessionClient)
2125
{ }
2226
NThreading::TPromise<TCreateSessionResult> Promise;
2327
std::string SessionId;
2428
std::string Endpoint;
2529
std::shared_ptr<TQueryClient::TImpl> Client;
30+
std::weak_ptr<ISessionClient> SessionClient;
2631
};
2732

2833
using TResponse = Ydb::Query::SessionState;
2934
using TStreamProcessorPtr = NYdbGrpc::IStreamRequestReadProcessor<TResponse>::TPtr;
30-
TImpl(TStreamProcessorPtr ptr, const std::string& id, const std::string& endpoint);
35+
TImpl(TStreamProcessorPtr ptr, const std::string& id, const std::string& endpoint, std::weak_ptr<ISessionClient> client);
3136
~TImpl();
3237

3338
static void MakeImplAsync(TStreamProcessorPtr processor, std::shared_ptr<TAttachSessionArgs> args);
3439

3540
private:
3641
static void NewSmartShared(TStreamProcessorPtr ptr, std::shared_ptr<TAttachSessionArgs> args, NYdb::TStatus status);
3742

43+
static void StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr<ISessionClient> client, std::shared_ptr<TSafeTSessionImplHolder> session);
44+
3845
private:
3946
TStreamProcessorPtr StreamProcessor_;
47+
std::shared_ptr<TSafeTSessionImplHolder> SessionHolder;
4048
};
4149

4250
}

0 commit comments

Comments
 (0)