Skip to content

Commit 8ec93b0

Browse files
authored
YQ-3684 fixed error duplicate session id (#9583)
1 parent d7de8bd commit 8ec93b0

File tree

5 files changed

+86
-11
lines changed

5 files changed

+86
-11
lines changed

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
256256
QueryState->UserToken
257257
), IEventHandle::FlagTrackDelivery);
258258

259+
QueryState->PoolHandlerActor = MakeKqpWorkloadServiceId(SelfId().NodeId());
259260
Become(&TKqpSessionActor::ExecuteState);
260261
}
261262

@@ -2421,6 +2422,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
24212422
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
24222423
hFunc(TEvents::TEvUndelivered, HandleNoop);
24232424
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleNoop);
2425+
hFunc(TEvKqpExecuter::TEvStreamData, HandleNoop);
24242426
hFunc(NWorkload::TEvContinueRequest, HandleNoop);
24252427

24262428
// always come from WorkerActor

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
161161
}
162162

163163
SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers);
164+
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvStopPoolHandlerResponse(Database, PoolId));
164165

165166
Counters.OnCleanup(ResetCountersOnStrop);
166167

@@ -184,16 +185,16 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
184185
}
185186

186187
void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) {
187-
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId));
188-
189188
auto event = std::move(ev->Get()->Event);
189+
const TString& sessionId = event->Get()->SessionId;
190+
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId, sessionId));
191+
190192
const TActorId& workerActorId = event->Sender;
191193
if (!InFlightLimit) {
192194
this->Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::PRECONDITION_FAILED, PoolId, PoolConfig, {NYql::TIssue(TStringBuilder() << "Resource pool " << PoolId << " was disabled due to zero concurrent query limit")}));
193195
return;
194196
}
195197

196-
const TString& sessionId = event->Get()->SessionId;
197198
if (LocalSessions.contains(sessionId)) {
198199
this->Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::INTERNAL_ERROR, PoolId, PoolConfig, {NYql::TIssue(TStringBuilder() << "Got duplicate session id " << sessionId << " for pool " << PoolId)}));
199200
return;

ydb/core/kqp/workload_service/common/events.h

+14-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ struct TEvPrivate {
2929
EvFinishRequestInPool,
3030
EvResignPoolHandler,
3131
EvStopPoolHandler,
32+
EvStopPoolHandlerResponse,
3233
EvCancelRequest,
3334
EvUpdatePoolSubscription,
3435

@@ -128,13 +129,15 @@ struct TEvPrivate {
128129
};
129130

130131
struct TEvPlaceRequestIntoPoolResponse : public NActors::TEventLocal<TEvPlaceRequestIntoPoolResponse, EvPlaceRequestIntoPoolResponse> {
131-
TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId)
132+
TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId, const TString& sessionId)
132133
: Database(database)
133134
, PoolId(poolId)
135+
, SessionId(sessionId)
134136
{}
135137

136138
const TString Database;
137139
const TString PoolId;
140+
const TString SessionId;
138141
};
139142

140143
struct TEvFinishRequestInPool : public NActors::TEventLocal<TEvFinishRequestInPool, EvFinishRequestInPool> {
@@ -173,6 +176,16 @@ struct TEvPrivate {
173176
const bool ResetCounters;
174177
};
175178

179+
struct TEvStopPoolHandlerResponse : public NActors::TEventLocal<TEvStopPoolHandlerResponse, EvStopPoolHandlerResponse> {
180+
TEvStopPoolHandlerResponse(const TString& database, const TString& poolId)
181+
: Database(database)
182+
, PoolId(poolId)
183+
{}
184+
185+
const TString Database;
186+
const TString PoolId;
187+
};
188+
176189
struct TEvCancelRequest : public NActors::TEventLocal<TEvCancelRequest, EvCancelRequest> {
177190
explicit TEvCancelRequest(const TString& sessionId)
178191
: SessionId(sessionId)

ydb/core/kqp/workload_service/kqp_workload_service.cpp

+40-7
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,21 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
169169
void Handle(TEvCleanupRequest::TPtr& ev) {
170170
const TString& database = ev->Get()->Database;
171171
const TString& poolId = ev->Get()->PoolId;
172+
const TString& sessionId = ev->Get()->SessionId;
173+
if (GetOrCreateDatabaseState(database)->PendingSessionIds.contains(sessionId)) {
174+
LOG_D("Finished request with worker actor " << ev->Sender << ", wait for place request, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId);
175+
GetOrCreateDatabaseState(database)->PendingCancelRequests[sessionId].emplace_back(std::move(ev));
176+
return;
177+
}
178+
172179
auto poolState = GetPoolState(database, poolId);
173180
if (!poolState) {
174181
ReplyCleanupError(ev->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << poolId << " not found");
175182
return;
176183
}
177184

178185
LOG_D("Finished request with worker actor " << ev->Sender << ", Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId);
179-
Send(ev->Forward(poolState->PoolHandler));
186+
poolState->DoCleanupRequest(std::move(ev));
180187
}
181188

182189
void Handle(TEvents::TEvWakeup::TPtr& ev) {
@@ -220,6 +227,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
220227
hFunc(TEvPrivate::TEvTablesCreationFinished, Handle);
221228
hFunc(TEvPrivate::TEvCpuLoadResponse, Handle);
222229
hFunc(TEvPrivate::TEvResignPoolHandler, Handle);
230+
hFunc(TEvPrivate::TEvStopPoolHandlerResponse, Handle);
223231
)
224232

225233
private:
@@ -245,12 +253,16 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
245253
void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) {
246254
const auto& event = ev->Get()->Event;
247255
const TString& database = event->Get()->Database;
256+
auto databaseState = GetOrCreateDatabaseState(database);
248257
if (ev->Get()->DefaultPoolCreated) {
249-
GetOrCreateDatabaseState(database)->HasDefaultPool = true;
258+
databaseState->HasDefaultPool = true;
250259
}
251260

252261
const TString& poolId = event->Get()->PoolId;
253262
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
263+
databaseState->RemovePendingSession(event->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) {
264+
ReplyCleanupError(event->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << event->Get()->PoolId << " not found");
265+
});
254266
ReplyContinueError(event->Sender, ev->Get()->Status, ev->Get()->Issues);
255267
return;
256268
}
@@ -265,9 +277,19 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
265277
void Handle(TEvPrivate::TEvPlaceRequestIntoPoolResponse::TPtr& ev) {
266278
const TString& database = ev->Get()->Database;
267279
const TString& poolId = ev->Get()->PoolId;
268-
LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId);
280+
const TString& sessionId = ev->Get()->SessionId;
281+
LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << sessionId);
269282

270-
if (auto poolState = GetPoolState(database, poolId)) {
283+
auto poolState = GetPoolState(database, poolId);
284+
GetOrCreateDatabaseState(database)->RemovePendingSession(sessionId, [this, poolState](TEvCleanupRequest::TPtr event) {
285+
if (poolState) {
286+
poolState->DoCleanupRequest(std::move(event));
287+
} else {
288+
ReplyCleanupError(event->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << event->Get()->PoolId << " not found");
289+
}
290+
});
291+
292+
if (poolState) {
271293
poolState->PlaceRequestRunning = false;
272294
poolState->UpdateHandler();
273295
poolState->StartPlaceRequest();
@@ -388,6 +410,17 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
388410
}
389411
}
390412

413+
void Handle(TEvPrivate::TEvStopPoolHandlerResponse::TPtr& ev) {
414+
const TString& database = ev->Get()->Database;
415+
const TString& poolId = ev->Get()->PoolId;
416+
LOG_T("Got stop pool handler response, Database: " << database << ", PoolId: " << poolId);
417+
418+
Counters.ActivePools->Dec();
419+
if (auto poolState = GetPoolState(database, poolId)) {
420+
poolState->PreviousPoolHandlers.erase(ev->Sender);
421+
}
422+
}
423+
391424
private:
392425
void InitializeWorkloadService() {
393426
if (ServiceInitialized) {
@@ -441,15 +474,14 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
441474
std::vector<TString> poolsToDelete;
442475
poolsToDelete.reserve(PoolIdToState.size());
443476
for (const auto& [poolKey, poolState] : PoolIdToState) {
444-
if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) {
477+
if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION && poolState.PendingRequests.empty()) {
445478
CpuQuotaManager->CleanupHandler(poolState.PoolHandler);
446479
Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler(true));
447480
poolsToDelete.emplace_back(poolKey);
448481
}
449482
}
450483
for (const auto& poolKey : poolsToDelete) {
451484
PoolIdToState.erase(poolKey);
452-
Counters.ActivePools->Dec();
453485
}
454486

455487
if (!PoolIdToState.empty()) {
@@ -512,7 +544,8 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
512544
Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)}));
513545
}
514546

515-
TDatabaseState* GetOrCreateDatabaseState(const TString& database) {
547+
TDatabaseState* GetOrCreateDatabaseState(TString database) {
548+
database = CanonizePath(database);
516549
auto databaseIt = DatabaseToState.find(database);
517550
if (databaseIt != DatabaseToState.end()) {
518551
return &databaseIt->second;

ydb/core/kqp/workload_service/kqp_workload_service_impl.h

+26
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ struct TDatabaseState {
1919
bool& EnabledResourcePoolsOnServerless;
2020

2121
std::vector<TEvPlaceRequestIntoPool::TPtr> PendingRequersts = {};
22+
std::unordered_set<TString> PendingSessionIds = {};
23+
std::unordered_map<TString, std::vector<TEvCleanupRequest::TPtr>> PendingCancelRequests = {};
2224
std::unordered_map<TString, std::unordered_set<TActorId>> PendingSubscriptions = {};
2325
bool HasDefaultPool = false;
2426
bool Serverless = false;
@@ -38,6 +40,7 @@ struct TDatabaseState {
3840

3941
void DoPlaceRequest(TEvPlaceRequestIntoPool::TPtr ev) {
4042
TString database = ev->Get()->Database;
43+
PendingSessionIds.emplace(ev->Get()->SessionId);
4144
PendingRequersts.emplace_back(std::move(ev));
4245

4346
if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) {
@@ -83,6 +86,14 @@ struct TDatabaseState {
8386
StartPendingRequests();
8487
}
8588

89+
void RemovePendingSession(const TString& sessionId, std::function<void(TEvCleanupRequest::TPtr)> callback) {
90+
for (auto& event : PendingCancelRequests[sessionId]) {
91+
callback(std::move(event));
92+
}
93+
PendingCancelRequests.erase(sessionId);
94+
PendingSessionIds.erase(sessionId);
95+
}
96+
8697
private:
8798
void StartPendingRequests() {
8899
if (!EnabledResourcePoolsOnServerless && Serverless) {
@@ -98,6 +109,9 @@ struct TDatabaseState {
98109

99110
void ReplyContinueError(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
100111
for (const auto& ev : PendingRequersts) {
112+
RemovePendingSession(ev->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) {
113+
ActorContext.Send(event->Sender, new TEvCleanupResponse(Ydb::StatusIds::NOT_FOUND, {NYql::TIssue(TStringBuilder() << "Pool " << event->Get()->PoolId << " not found")}));
114+
});
101115
ActorContext.Send(ev->Sender, new TEvContinueRequest(status, {}, {}, issues));
102116
}
103117
PendingRequersts.clear();
@@ -112,6 +126,7 @@ struct TPoolState {
112126
bool WaitingInitialization = false;
113127
bool PlaceRequestRunning = false;
114128
std::optional<TActorId> NewPoolHandler = std::nullopt;
129+
std::unordered_set<TActorId> PreviousPoolHandlers = {};
115130

116131
ui64 InFlightRequests = 0;
117132
TInstant LastUpdateTime = TInstant::Now();
@@ -122,6 +137,7 @@ struct TPoolState {
122137
}
123138

124139
ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
140+
PreviousPoolHandlers.insert(PoolHandler);
125141
PoolHandler = *NewPoolHandler;
126142
NewPoolHandler = std::nullopt;
127143
InFlightRequests = 0;
@@ -143,6 +159,16 @@ struct TPoolState {
143159
InFlightRequests--;
144160
LastUpdateTime = TInstant::Now();
145161
}
162+
163+
void DoCleanupRequest(TEvCleanupRequest::TPtr event) {
164+
for (const auto& poolHandler : PreviousPoolHandlers) {
165+
ActorContext.Send(poolHandler, new TEvCleanupRequest(
166+
event->Get()->Database, event->Get()->SessionId, event->Get()->PoolId,
167+
event->Get()->Duration, event->Get()->CpuConsumed
168+
));
169+
}
170+
ActorContext.Send(event->Forward(PoolHandler));
171+
}
146172
};
147173

148174
struct TCpuQuotaManagerState {

0 commit comments

Comments
 (0)