Skip to content

Commit 8c7f1a0

Browse files
committed
Fixed pipe client closing
1 parent 059ae3b commit 8c7f1a0

File tree

1 file changed

+20
-25
lines changed

1 file changed

+20
-25
lines changed

ydb/core/kqp/workload_service/actors/scheme_actors.cpp

+20-25
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
118118

119119
class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
120120
public:
121-
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
121+
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
122122
: ReplyActorId(replyActorId)
123123
, Database(database)
124124
, PoolId(poolId)
@@ -294,19 +294,16 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
294294
return;
295295
}
296296

297-
PipeClientClosedByUs = true;
298-
SchemePipeActorId = {};
299-
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
300-
297+
ClosePipeClient();
301298
ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status));
302299
}
303300

304-
void HandleClientDestroyed() {
305-
SchemePipeActorId = {};
306-
if (!PipeClientClosedByUs) {
301+
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
302+
const TActorId clientId = ev->Get()->ClientId;
303+
if (!ClosedSchemePipeActors.contains(clientId)) {
304+
ClosePipeClient();
307305
ScheduleRetry("Tablet to pipe destroyed");
308306
}
309-
PipeClientClosedByUs = false;
310307
}
311308

312309
void HandleNotifyTxCompletionResult() {
@@ -317,7 +314,7 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
317314
switch (ev->GetTypeRewrite()) {
318315
hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle)
319316
hFunc(TEvTabletPipe::TEvClientConnected, Handle)
320-
sFunc(TEvTabletPipe::TEvClientDestroyed, HandleClientDestroyed)
317+
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle)
321318
sFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, HandleNotifyTxCompletionResult)
322319
IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered)
323320

@@ -358,11 +355,10 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
358355
void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) {
359356
const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId();
360357
if (txId == 0) {
361-
ScheduleRetry(response, "Unable to subscribe to concurrent transaction");
358+
ScheduleRetry(response, "Unable to subscribe to concurrent transaction", true);
362359
return;
363360
}
364361

365-
PipeClientClosedByUs = false;
366362
SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId()));
367363

368364
auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
@@ -371,11 +367,16 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
371367
LOG_D("Subscribe on create pool tx: " << txId);
372368
}
373369

374-
void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) {
375-
if (SchemePipeActorId){
376-
PipeClientClosedByUs = true;
370+
void ClosePipeClient() {
371+
if (SchemePipeActorId) {
372+
ClosedSchemePipeActors.insert(SchemePipeActorId);
373+
SchemePipeActorId = {};
377374
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
378375
}
376+
}
377+
378+
void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) {
379+
ClosePipeClient();
379380

380381
auto ssStatus = static_cast<NKikimrScheme::EStatus>(response.GetSchemeShardStatus());
381382
if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) {
@@ -384,11 +385,7 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
384385
}
385386

386387
void ScheduleRetry(const TString& message, bool longDelay = false) {
387-
if (SchemePipeActorId){
388-
PipeClientClosedByUs = true;
389-
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
390-
}
391-
388+
ClosePipeClient();
392389
if (!TBase::ScheduleRetry(message, longDelay)) {
393390
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message);
394391
}
@@ -423,9 +420,7 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
423420
LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString());
424421
}
425422

426-
if (SchemePipeActorId) {
427-
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
428-
}
423+
ClosePipeClient();
429424

430425
Issues.AddIssues(std::move(issues));
431426
Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues)));
@@ -446,8 +441,8 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
446441
const NACLibProto::TDiffACL DiffAcl;
447442
NResourcePool::TPoolSettings PoolConfig;
448443

449-
NActors::TActorId SchemePipeActorId;
450-
bool PipeClientClosedByUs = false;
444+
std::unordered_set<TActorId> ClosedSchemePipeActors;
445+
TActorId SchemePipeActorId;
451446
};
452447

453448
} // anonymous namespace

0 commit comments

Comments
 (0)