Skip to content

Commit 3ab7dd7

Browse files
authored
Remove session from pool in case of BAD_SESSION... (#10437)
1 parent 168df93 commit 3ab7dd7

File tree

5 files changed

+171
-2
lines changed

5 files changed

+171
-2
lines changed

ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ NThreading::TFuture<TResponse> InjectSessionStatusInterception(
5252
// Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect
5353
// since we have guarantee this request wasn't been started to execute.
5454

55-
if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
55+
if (status.IsTransportError()
56+
&& status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED && status.GetStatus() != EStatus::CLIENT_OUT_OF_RANGE)
57+
{
5658
impl->MarkBroken();
5759
} else if (status.GetStatus() == EStatus::SESSION_BUSY) {
5860
impl->MarkBroken();
@@ -71,6 +73,7 @@ NThreading::TFuture<TResponse> InjectSessionStatusInterception(
7173
impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TKqpSessionCommon::EState::S_ACTIVE);
7274
}
7375
}
76+
7477
if (cb) {
7578
cb(value, *impl);
7679
}

ydb/public/sdk/cpp/client/ydb_query/client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ class TTransaction;
133133
class TSession {
134134
friend class TQueryClient;
135135
friend class TTransaction;
136+
friend class TExecuteQueryIterator;
136137
public:
137138
const TString& GetId() const;
138139

ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#define INCLUDE_YDB_INTERNAL_H
22
#include "exec_query.h"
3+
#include "client_session.h"
34

45
#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
56
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
67
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h>
8+
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h>
79
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
810
#undef INCLUDE_YDB_INTERNAL_H
911

@@ -59,7 +61,7 @@ class TExecuteQueryIterator::TReaderImpl {
5961
return Finished_;
6062
}
6163

62-
TAsyncExecuteQueryPart ReadNext(std::shared_ptr<TSelf> self) {
64+
TAsyncExecuteQueryPart DoReadNext(std::shared_ptr<TSelf> self) {
6365
auto promise = NThreading::NewPromise<TExecuteQueryPart>();
6466
// Capture self - guarantee no dtor call during the read
6567
auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable {
@@ -100,6 +102,18 @@ class TExecuteQueryIterator::TReaderImpl {
100102
StreamProcessor_->Read(&Response_, readCb);
101103
return promise.GetFuture();
102104
}
105+
106+
TAsyncExecuteQueryPart ReadNext(std::shared_ptr<TSelf> self) {
107+
if (!Session_)
108+
return DoReadNext(std::move(self));
109+
110+
return NSessionPool::InjectSessionStatusInterception(
111+
Session_->SessionImpl_,
112+
DoReadNext(std::move(self)),
113+
false, // no need to ping stream session
114+
TDuration::Zero());
115+
}
116+
103117
private:
104118
TStreamProcessorPtr StreamProcessor_;
105119
TResponse Response_;

ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,57 @@
44

55
#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
66

7+
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
8+
79
using namespace NYdb;
810
using namespace NYdb::NTable;
911

12+
namespace {
13+
14+
void CreateTestTable(NYdb::TDriver& driver) {
15+
NYdb::NTable::TTableClient client(driver);
16+
auto sessionResponse = client.GetSession().ExtractValueSync();
17+
UNIT_ASSERT(sessionResponse.IsSuccess());
18+
auto session = sessionResponse.GetSession();
19+
auto result = session.ExecuteSchemeQuery(R"___(
20+
CREATE TABLE `Root/Test` (
21+
Key Uint32,
22+
Value String,
23+
PRIMARY KEY (Key)
24+
);
25+
)___").ExtractValueSync();
26+
UNIT_ASSERT(result.IsSuccess());
27+
UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
28+
}
29+
30+
TString WarmPoolCreateSession(NYdb::NQuery::TQueryClient& client) {
31+
TString sessionId;
32+
auto sessionResponse = client.GetSession().ExtractValueSync();
33+
UNIT_ASSERT(sessionResponse.IsSuccess());
34+
auto session = sessionResponse.GetSession();
35+
sessionId = session.GetId();
36+
auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
37+
38+
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
39+
40+
TResultSetParser resultSet(res.GetResultSetParser(0));
41+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
42+
43+
return sessionId;
44+
}
45+
46+
void WaitForSessionsInPool(NYdb::NQuery::TQueryClient& client, i64 expected) {
47+
int attempt = 10;
48+
while (attempt--) {
49+
if (client.GetCurrentPoolSize() == expected)
50+
break;
51+
Sleep(TDuration::MilliSeconds(100));
52+
}
53+
UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), expected);
54+
}
55+
56+
}
57+
1058
Y_UNIT_TEST_SUITE(YdbSdkSessions) {
1159
Y_UNIT_TEST(TestSessionPool) {
1260
TKikimrWithGrpcAndRootSchema server;
@@ -128,6 +176,108 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) {
128176
driver.Stop(true);
129177
}
130178

179+
Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryService) {
180+
TKikimrWithGrpcAndRootSchema server;
181+
ui16 grpc = server.GetPort();
182+
183+
TString location = TStringBuilder() << "localhost:" << grpc;
184+
auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);
185+
186+
auto driver = NYdb::TDriver(
187+
TDriverConfig()
188+
.SetEndpoint(location));
189+
190+
CreateTestTable(driver);
191+
192+
NYdb::NQuery::TQueryClient client(driver);
193+
TString sessionId = WarmPoolCreateSession(client);
194+
WaitForSessionsInPool(client, 1);
195+
196+
bool allDoneOk = true;
197+
NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);
198+
UNIT_ASSERT(allDoneOk);
199+
200+
{
201+
auto sessionResponse = client.GetSession().ExtractValueSync();
202+
UNIT_ASSERT(sessionResponse.IsSuccess());
203+
auto session = sessionResponse.GetSession();
204+
UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId);
205+
206+
auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
207+
208+
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString());
209+
}
210+
211+
WaitForSessionsInPool(client, 0);
212+
213+
{
214+
auto sessionResponse = client.GetSession().ExtractValueSync();
215+
UNIT_ASSERT(sessionResponse.IsSuccess());
216+
auto session = sessionResponse.GetSession();
217+
UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId);
218+
auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
219+
220+
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
221+
}
222+
223+
WaitForSessionsInPool(client, 1);
224+
225+
driver.Stop(true);
226+
}
227+
228+
Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) {
229+
TKikimrWithGrpcAndRootSchema server;
230+
ui16 grpc = server.GetPort();
231+
232+
TString location = TStringBuilder() << "localhost:" << grpc;
233+
auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);
234+
235+
auto driver = NYdb::TDriver(
236+
TDriverConfig()
237+
.SetEndpoint(location));
238+
239+
CreateTestTable(driver);
240+
241+
NYdb::NQuery::TQueryClient client(driver);
242+
TString sessionId = WarmPoolCreateSession(client);
243+
WaitForSessionsInPool(client, 1);
244+
245+
bool allDoneOk = true;
246+
NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);
247+
UNIT_ASSERT(allDoneOk);
248+
249+
{
250+
auto sessionResponse = client.GetSession().ExtractValueSync();
251+
UNIT_ASSERT(sessionResponse.IsSuccess());
252+
auto session = sessionResponse.GetSession();
253+
UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId);
254+
255+
auto it = session.StreamExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
256+
257+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
258+
259+
auto res = it.ReadNext().GetValueSync();
260+
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString());
261+
}
262+
263+
WaitForSessionsInPool(client, 0);
264+
265+
{
266+
auto sessionResponse = client.GetSession().ExtractValueSync();
267+
UNIT_ASSERT(sessionResponse.IsSuccess());
268+
auto session = sessionResponse.GetSession();
269+
UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId);
270+
271+
auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
272+
273+
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
274+
}
275+
276+
WaitForSessionsInPool(client, 1);
277+
278+
driver.Stop(true);
279+
}
280+
131281
Y_UNIT_TEST(TestActiveSessionCountAfterTransportError) {
132282
TKikimrWithGrpcAndRootSchema server;
133283
ui16 grpc = server.GetPort();

ydb/services/ydb/sdk_sessions_ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ PEERDIR(
2121
ydb/core/testlib/default
2222
ydb/core/testlib
2323
ydb/public/sdk/cpp/client/ydb_table
24+
ydb/public/lib/ut_helpers
2425
)
2526

2627
YQL_LAST_ABI_VERSION()

0 commit comments

Comments
 (0)