Skip to content

Commit 1a58392

Browse files
committed
YQ-3459 fix resource pools permissions (ydb-platform#6989)
1 parent 60451f5 commit 1a58392

File tree

5 files changed

+111
-45
lines changed

5 files changed

+111
-45
lines changed

ydb/core/kqp/workload_service/actors/scheme_actors.cpp

+96-17
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#include "actors.h"
22

33
#include <ydb/core/base/path.h>
4+
#include <ydb/core/base/tablet_pipe.h>
45

56
#include <ydb/core/kqp/common/simple/services.h>
67
#include <ydb/core/kqp/workload_service/common/events.h>
78
#include <ydb/core/kqp/workload_service/common/helpers.h>
89

10+
#include <ydb/core/tx/schemeshard/schemeshard.h>
911
#include <ydb/core/tx/tx_proxy/proxy.h>
1012

1113
#include <ydb/library/table_creator/table_creator.h>
@@ -64,7 +66,13 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
6466
for (const TString& usedSid : AppData()->AdministrationAllowedSIDs) {
6567
diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::GenericFull, usedSid);
6668
}
67-
diffAcl.AddAccess(NACLib::EAccessType::Allow, NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema, AppData()->AllAuthenticatedUsers);
69+
70+
auto useAccess = NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema;
71+
for (const auto& userSID : AppData()->DefaultUserSIDs) {
72+
diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, userSID);
73+
}
74+
diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, AppData()->AllAuthenticatedUsers);
75+
diffAcl.AddAccess(NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT);
6876

6977
auto token = MakeIntrusive<NACLib::TUserToken>(BUILTIN_ACL_METADATA, TVector<NACLib::TSID>{});
7078
Register(CreatePoolCreatorActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, NResourcePool::TPoolSettings(), token, diffAcl));
@@ -116,7 +124,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
116124

117125
class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
118126
public:
119-
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
127+
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
120128
: ReplyActorId(replyActorId)
121129
, Database(database)
122130
, PoolId(poolId)
@@ -255,38 +263,67 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
255263
}
256264

257265
void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
258-
const auto ssStatus = ev->Get()->Record.GetSchemeShardStatus();
259-
switch (ev->Get()->Status()) {
266+
const auto& response = ev->Get()->Record;
267+
const auto ssStatus = response.GetSchemeShardStatus();
268+
const auto status = ev->Get()->Status();
269+
switch (status) {
260270
case NTxProxy::TResultStatus::ExecComplete:
261271
case NTxProxy::TResultStatus::ExecAlready:
262272
if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) {
263273
Reply(Ydb::StatusIds::SUCCESS);
264274
} else {
265-
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Invalid creation status: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
275+
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Invalid creation status: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
266276
}
267277
return;
268278
case NTxProxy::TResultStatus::ExecError:
269-
if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications || ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter) {
270-
ScheduleRetry(ssStatus, "Retry execution error", true);
279+
if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) {
280+
SubscribeOnTransactionOrRetry(status, response);
271281
} else {
272-
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Execution error: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
282+
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Execution error: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
273283
}
274284
return;
275285
case NTxProxy::TResultStatus::ExecInProgress:
276-
ScheduleRetry(ssStatus, "Retry execution in progress error", true);
286+
SubscribeOnTransactionOrRetry(status, response);
277287
return;
278288
case NTxProxy::TResultStatus::ProxyShardNotAvailable:
279-
ScheduleRetry(ssStatus, "Retry shard unavailable error");
289+
ScheduleRetry(response, "Retry shard unavailable error");
280290
return;
281291
default:
282-
Reply(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Failed to create resource pool: " << static_cast<NKikimrScheme::EStatus>(ssStatus));
292+
Reply(Ydb::StatusIds::SCHEME_ERROR, ExtractIssues(response, TStringBuilder() << "Failed to create resource pool: " << static_cast<NKikimrScheme::EStatus>(ssStatus)));
283293
return;
284294
}
285295
}
286296

297+
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
298+
if (ev->Get()->Status == NKikimrProto::OK) {
299+
LOG_T("Tablet to pipe successfully connected");
300+
return;
301+
}
302+
303+
ClosePipeClient();
304+
ScheduleRetry(TStringBuilder() << "Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name(ev->Get()->Status));
305+
}
306+
307+
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
308+
const TActorId clientId = ev->Get()->ClientId;
309+
if (!ClosedSchemePipeActors.contains(clientId)) {
310+
ClosePipeClient();
311+
ScheduleRetry("Tablet to pipe destroyed");
312+
}
313+
}
314+
315+
void Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
316+
ScheduleRetry(TStringBuilder() << "Transaction " << ev->Get()->Record.GetTxId() << " completed, doublechecking");
317+
}
318+
287319
STFUNC(StateFunc) {
288320
switch (ev->GetTypeRewrite()) {
289321
hFunc(TEvTxUserProxy::TEvProposeTransactionStatus, Handle)
322+
hFunc(TEvTabletPipe::TEvClientConnected, Handle)
323+
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle)
324+
hFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle)
325+
IgnoreFunc(NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered)
326+
290327
default:
291328
StateFuncBase(ev);
292329
}
@@ -301,13 +338,12 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
301338
schemeTx.SetWorkingDir(JoinPath({Database, ".resource_pools"}));
302339
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateResourcePool);
303340
schemeTx.SetInternal(true);
304-
schemeTx.SetAllowAccessToPrivatePaths(true);
305341

306342
BuildCreatePoolRequest(*schemeTx.MutableCreateResourcePool());
307343
BuildModifyAclRequest(*schemeTx.MutableModifyACL());
308344

309345
if (UserToken) {
310-
event->Record.SetUserToken(UserToken->GetSerializedToken());
346+
event->Record.SetUserToken(UserToken->SerializeAsString());
311347
}
312348

313349
Send(MakeTxProxyID(), std::move(event));
@@ -322,10 +358,42 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
322358
}
323359

324360
private:
325-
void ScheduleRetry(ui32 status, const TString& message, bool longDelay = false) {
326-
auto ssStatus = static_cast<NKikimrScheme::EStatus>(status);
327-
if (!TBase::ScheduleRetry(TStringBuilder() << message << ", status: " << ssStatus, longDelay)) {
328-
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus);
361+
void SubscribeOnTransactionOrRetry(NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) {
362+
const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId() : response.GetPathCreateTxId();
363+
if (txId == 0) {
364+
ScheduleRetry(response, "Unable to subscribe to concurrent transaction", true);
365+
return;
366+
}
367+
368+
SchemePipeActorId = Register(NTabletPipe::CreateClient(SelfId(), response.GetSchemeShardTabletId()));
369+
370+
auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
371+
request->Record.SetTxId(txId);
372+
NTabletPipe::SendData(SelfId(), SchemePipeActorId, std::move(request));
373+
LOG_D("Subscribe on create pool tx: " << txId);
374+
}
375+
376+
void ClosePipeClient() {
377+
if (SchemePipeActorId) {
378+
ClosedSchemePipeActors.insert(SchemePipeActorId);
379+
NTabletPipe::CloseClient(SelfId(), SchemePipeActorId);
380+
SchemePipeActorId = {};
381+
}
382+
}
383+
384+
void ScheduleRetry(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false) {
385+
ClosePipeClient();
386+
387+
auto ssStatus = static_cast<NKikimrScheme::EStatus>(response.GetSchemeShardStatus());
388+
if (!TBase::ScheduleRetry(ExtractIssues(response, TStringBuilder() << message << ", status: " << ssStatus), longDelay)) {
389+
Reply(Ydb::StatusIds::UNAVAILABLE, ExtractIssues(response, TStringBuilder() << "Retry limit exceeded on status: " << ssStatus));
390+
}
391+
}
392+
393+
void ScheduleRetry(const TString& message, bool longDelay = false) {
394+
ClosePipeClient();
395+
if (!TBase::ScheduleRetry(message, longDelay)) {
396+
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on error: " << message);
329397
}
330398
}
331399

@@ -358,18 +426,29 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
358426
LOG_W("Failed to create pool, " << status << ", issues: " << issues.ToOneLineString());
359427
}
360428

429+
ClosePipeClient();
430+
361431
Issues.AddIssues(std::move(issues));
362432
Send(ReplyActorId, new TEvPrivate::TEvCreatePoolResponse(status, std::move(Issues)));
363433
PassAway();
364434
}
365435

436+
static NYql::TIssues ExtractIssues(const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message) {
437+
NYql::TIssues issues;
438+
NYql::IssuesFromMessage(response.GetIssues(), issues);
439+
return GroupIssues(issues, message);
440+
}
441+
366442
private:
367443
const TActorId ReplyActorId;
368444
const TString Database;
369445
const TString PoolId;
370446
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
371447
const NACLibProto::TDiffACL DiffAcl;
372448
NResourcePool::TPoolSettings PoolConfig;
449+
450+
std::unordered_set<TActorId> ClosedSchemePipeActors;
451+
TActorId SchemePipeActorId;
373452
};
374453

375454
} // anonymous namespace

ydb/core/kqp/workload_service/common/helpers.h

+7-3
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,25 @@ class TSchemeActorBase : public NActors::TActorBootstrapped<TDerived> {
6262
virtual TString LogPrefix() const = 0;
6363

6464
protected:
65-
bool ScheduleRetry(const TString& message, bool longDelay = false) {
65+
bool ScheduleRetry(NYql::TIssues issues, bool longDelay = false) {
6666
if (!RetryState) {
6767
RetryState = CreateRetryState();
6868
}
6969

7070
if (const auto delay = RetryState->GetNextRetryDelay(longDelay)) {
71-
Issues.AddIssue(message);
71+
Issues.AddIssues(issues);
7272
this->Schedule(*delay, new TEvents::TEvWakeup());
73-
LOG_W("Scheduled retry for error: " << message);
73+
LOG_W("Scheduled retry for error: " << issues.ToOneLineString());
7474
return true;
7575
}
7676

7777
return false;
7878
}
7979

80+
bool ScheduleRetry(const TString& message, bool longDelay = false) {
81+
return ScheduleRetry({NYql::TIssue(message)}, longDelay);
82+
}
83+
8084
private:
8185
static TRetryPolicy::IRetryState::TPtr CreateRetryState() {
8286
return TRetryPolicy::GetFixedIntervalPolicy(

ydb/core/kqp/workload_service/kqp_workload_service.cpp

-22
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,6 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
132132
return;
133133
}
134134

135-
// Add AllAuthenticatedUsers group SID into user token
136-
ev->Get()->UserToken = GetUserToken(ev->Get()->UserToken);
137-
138135
LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
139136
bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database));
140137
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless));
@@ -475,25 +472,6 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
475472
Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)}));
476473
}
477474

478-
static TIntrusivePtr<NACLib::TUserToken> GetUserToken(TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
479-
auto token = MakeIntrusive<NACLib::TUserToken>(userToken ? userToken->GetUserSID() : NACLib::TSID(), TVector<NACLib::TSID>{});
480-
481-
bool hasAllAuthenticatedUsersSID = false;
482-
const auto& allAuthenticatedUsersSID = AppData()->AllAuthenticatedUsers;
483-
if (userToken) {
484-
for (const auto& groupSID : userToken->GetGroupSIDs()) {
485-
token->AddGroupSID(groupSID);
486-
hasAllAuthenticatedUsersSID = hasAllAuthenticatedUsersSID || groupSID == allAuthenticatedUsersSID;
487-
}
488-
}
489-
490-
if (!hasAllAuthenticatedUsersSID) {
491-
token->AddGroupSID(allAuthenticatedUsersSID);
492-
}
493-
494-
return token;
495-
}
496-
497475
TPoolState* GetPoolState(const TString& database, const TString& poolId) {
498476
return GetPoolState(GetPoolKey(database, poolId));
499477
}

ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr<IYdbSetup> ydb, c
1616
auto runtime = ydb->GetRuntime();
1717
const auto& edgeActor = runtime->AllocateEdgeActor();
1818

19-
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{}), true));
19+
auto userToken = MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{});
20+
userToken->SaveSerializationInfo();
21+
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true));
2022
return runtime->GrabEdgeEvent<TEvPrivate::TEvFetchPoolResponse>(edgeActor, FUTURE_WAIT_TIMEOUT);
2123
}
2224

@@ -108,7 +110,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) {
108110

109111
// Check default pool access
110112
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(userSID)));
111-
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID("")));
113+
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(ydb->GetRuntime()->GetAppData().AllAuthenticatedUsers)));
114+
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings.UserSID(BUILTIN_ACL_ROOT)));
112115
}
113116

114117
Y_UNIT_TEST(TestDefaultPoolAdminPermissions) {

ydb/library/table_creator/table_creator.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,9 @@ THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(cons
392392
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
393393
auto databasePath = SplitPath(database);
394394
request->DatabaseName = CanonizePath(databasePath);
395-
request->UserToken = userToken;
395+
if (userToken && !userToken->GetSerializedToken().empty()) {
396+
request->UserToken = userToken;
397+
}
396398

397399
for (const auto& pathComponents : pathsComponents) {
398400
auto& entry = request->ResultSet.emplace_back();

0 commit comments

Comments
 (0)