Skip to content

Commit 8d06594

Browse files
authored
Merge 13b048e into 58f857d
2 parents 58f857d + 13b048e commit 8d06594

File tree

11 files changed

+278
-119
lines changed

11 files changed

+278
-119
lines changed

ydb/core/kqp/workload_service/actors/scheme_actors.cpp

+89-16
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#include "actors.h"
22

33
#include <ydb/core/base/path.h>
4+
#include <ydb/core/base/tablet_pipe.h>
45

56
#include <ydb/core/kqp/common/simple/services.h>
67
#include <ydb/core/kqp/workload_service/common/events.h>
78
#include <ydb/core/kqp/workload_service/common/helpers.h>
89

10+
#include <ydb/core/tx/schemeshard/schemeshard.h>
911
#include <ydb/core/tx/tx_proxy/proxy.h>
1012

1113
#include <ydb/library/table_creator/table_creator.h>
@@ -116,7 +118,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
116118

117119
class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
118120
public:
119-
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
121+
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
120122
: ReplyActorId(replyActorId)
121123
, Database(database)
122124
, PoolId(poolId)
@@ -255,38 +257,67 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
255257
}
256258

257259
void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
258-
const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus();
259-
switch (ev->Get()->Status()) {
260+
const auto& response = ev->Get()->Record;
261+
const auto ssStatus = response.GetSchemeShardStatus();
262+
const auto status = ev->Get()->Status();
263+
switch (status) {
260264
case NTxProxy::TResultStatus::ExecComplete:
261265
case NTxProxy::TResultStatus::ExecAlready:
262266
if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) {
263267
Reply(Ydb::StatusIds::SUCCESS);
264268
} else {
265-
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Invalid creation status: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
269+
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Invalid creation status: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
266270
}
267271
return;
268272
case NTxProxy::TResultStatus::ExecError:
269-
if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications || ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) {
270-
ScheduleRetry(ssStatus, "Retry execution error", true);
273+
if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) {
274+
SubscribeOnTransactionOrRetry(status, response);
271275
} else {
272-
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Execution error: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
276+
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Execution error: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
273277
}
274278
return;
275279
case NTxProxy::TResultStatus::ExecInProgress:
276-
ScheduleRetry(ssStatus, "Retry execution in progress error", true);
280+
SubscribeOnTransactionOrRetry(status, response);
277281
return;
278282
case NTxProxy::TResultStatus::ProxyShardNotAvailable:
279-
ScheduleRetry(ssStatus, "Retry shard unavailable error");
283+
ScheduleRetry(response, "Retry shard unavailable error");
280284
return;
281285
default:
282-
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Failed to create resource pool: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
286+
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Failed to create resource pool: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
283287
return;
284288
}
285289
}
286290

291+
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
292+
if (ev->Get()->Status == NKikimrProto::OK) {
293+
LOG_T("Tablet to pipe successfully connected");
294+
return;
295+
}
296+
297+
ClosePipeClient();
298+
ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status));
299+
}
300+
301+
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
302+
const TActorId clientId = ev->Get()->ClientId;
303+
if (!ClosedSchemePipeActors.contains(clientId)) {
304+
ClosePipeClient();
305+
ScheduleRetry("Tablet to pipe destroyed");
306+
}
307+
}
308+
309+
void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
310+
ScheduleRetry(TStringBuilder() << "Transaction " << ev->Get()->Record.GetTxId() << " completed, doublechecking");
311+
}
312+
287313
STFUNC(StateFunc) {
288314
switch (ev->GetTypeRewrite()) {
289315
hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle)
316+
hFunc(TEvTabletPipe::TEvClientConnected, Handle)
317+
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle)
318+
hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle)
319+
IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered)
320+
290321
default:
291322
StateFuncBase(ev);
292323
}
@@ -301,13 +332,12 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
301332
schemeTx.SetWorkingDir(JoinPath({Database, ".resource_pools"}));
302333
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool);
303334
schemeTx.SetInternal(true);
304-
schemeTx.SetAllowAccessToPrivatePaths(true);
305335

306336
BuildCreatePoolRequest(*schemeTx.MutableCreateResourcePool());
307337
BuildModifyAclRequest(*schemeTx.MutableModifyACL());
308338

309339
if (UserToken) {
310-
event->Record.SetUserToken(UserToken->GetSerializedToken());
340+
event->Record.SetUserToken(UserToken->SerializeAsString());
311341
}
312342

313343
Send(MakeTxProxyID(), std::move(event));
@@ -322,10 +352,42 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
322352
}
323353

324354
private:
325-
void ScheduleRetry(ui32 status, const TString& message, bool longDelay = false) {
326-
auto ssStatus = static_cast<NKikimrScheme::EStatus>(status);
327-
if (!TBase::ScheduleRetry(TStringBuilder() << message << ", status: " << ssStatus, longDelay)) {
328-
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus);
355+
void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) {
356+
const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId();
357+
if (txId == 0) {
358+
ScheduleRetry(response, "Unable to subscribe to concurrent transaction", true);
359+
return;
360+
}
361+
362+
SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId()));
363+
364+
auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
365+
request->Record.SetTxId(txId);
366+
NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request));
367+
LOG_D("Subscribe on create pool tx: " << txId);
368+
}
369+
370+
void ClosePipeClient() {
371+
if (SchemePipeActorId) {
372+
ClosedSchemePipeActors.insert(SchemePipeActorId);
373+
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
374+
SchemePipeActorId = {};
375+
}
376+
}
377+
378+
void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) {
379+
ClosePipeClient();
380+
381+
auto ssStatus = static_cast<NKikimrScheme::EStatus>(response.GetSchemeShardStatus());
382+
if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) {
383+
Reply(Ydb::StatusIds::UNAVAILABLE, ExtractIssues(response, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus));
384+
}
385+
}
386+
387+
void ScheduleRetry(const TString& message, bool longDelay = false) {
388+
ClosePipeClient();
389+
if (!TBase::ScheduleRetry(message, longDelay)) {
390+
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message);
329391
}
330392
}
331393

@@ -358,18 +420,29 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
358420
LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString());
359421
}
360422

423+
ClosePipeClient();
424+
361425
Issues.AddIssues(std::move(issues));
362426
Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues)));
363427
PassAway();
364428
}
365429

430+
static NYql::TIssues ExtractIssues(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message) {
431+
NYql::TIssues issues;
432+
NYql::IssuesFromMessage(response.GetIssues(), issues);
433+
return GroupIssues(issues, message);
434+
}
435+
366436
private:
367437
const TActorId ReplyActorId;
368438
const TString Database;
369439
const TString PoolId;
370440
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
371441
const NACLibProto::TDiffACL DiffAcl;
372442
NResourcePool::TPoolSettings PoolConfig;
443+
444+
std::unordered_set<TActorId> ClosedSchemePipeActors;
445+
TActorId SchemePipeActorId;
373446
};
374447

375448
} // anonymous namespace

ydb/core/kqp/workload_service/common/helpers.h

+7-3
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,25 @@ class TSchemeActorBase : public NActors::TActorBootstrapped<TDerived> {
6262
virtual TString LogPrefix() const = 0;
6363

6464
protected:
65-
bool ScheduleRetry(const TString& message, bool longDelay = false) {
65+
bool ScheduleRetry(NYql::TIssues issues, bool longDelay = false) {
6666
if (!RetryState) {
6767
RetryState = CreateRetryState();
6868
}
6969

7070
if (const auto delay = RetryState->GetNextRetryDelay(longDelay)) {
71-
Issues.AddIssue(message);
71+
Issues.AddIssues(issues);
7272
this->Schedule(*delay, new TEvents::TEvWakeup());
73-
LOG_W("Scheduled retry for error: " << message);
73+
LOG_W("Scheduled retry for error: " << issues.ToOneLineString());
7474
return true;
7575
}
7676

7777
return false;
7878
}
7979

80+
bool ScheduleRetry(const TString& message, bool longDelay = false) {
81+
return ScheduleRetry({NYql::TIssue(message)}, longDelay);
82+
}
83+
8084
private:
8185
static TRetryPolicy::IRetryState::TPtr CreateRetryState() {
8286
return TRetryPolicy::GetFixedIntervalPolicy(

ydb/core/kqp/workload_service/kqp_workload_service.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,10 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
491491
token->AddGroupSID(allAuthenticatedUsersSID);
492492
}
493493

494+
if (userToken && !userToken->GetSerializedToken().empty()) {
495+
token->SaveSerializationInfo();
496+
}
497+
494498
return token;
495499
}
496500

ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr<IYdbSetup> ydb, c
1616
auto runtime = ydb->GetRuntime();
1717
const auto& edgeActor = runtime->AllocateEdgeActor();
1818

19-
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{}), true));
19+
auto userToken = MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{});
20+
userToken->SaveSerializationInfo();
21+
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true));
2022
return runtime->GrabEdgeEvent<TEvPrivate::TEvFetchPoolResponse>(edgeActor, FUTURE_WAIT_TIMEOUT);
2123
}
2224

ydb/library/table_creator/table_creator.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,9 @@ THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(cons
392392
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
393393
auto databasePath = SplitPath(database);
394394
request->DatabaseName = CanonizePath(databasePath);
395-
request->UserToken = userToken;
395+
if (userToken && !userToken->GetSerializedToken().empty()) {
396+
request->UserToken = userToken;
397+
}
396398

397399
for (const auto& pathComponents : pathsComponents) {
398400
auto& entry = request->ResultSet.emplace_back();

0 commit comments

Comments
 (0)