1
1
#include " actors.h"
2
2
3
3
#include < ydb/core/base/path.h>
4
+ #include < ydb/core/base/tablet_pipe.h>
4
5
5
6
#include < ydb/core/kqp/common/simple/services.h>
6
7
#include < ydb/core/kqp/workload_service/common/events.h>
7
8
#include < ydb/core/kqp/workload_service/common/helpers.h>
8
9
10
+ #include < ydb/core/tx/schemeshard/schemeshard.h>
9
11
#include < ydb/core/tx/tx_proxy/proxy.h>
10
12
11
13
#include < ydb/library/table_creator/table_creator.h>
@@ -64,7 +66,13 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
64
66
for (const TString& usedSid : AppData ()->AdministrationAllowedSIDs ) {
65
67
diffAcl.AddAccess (NACLib::EAccessType::Allow, NACLib::EAccessRights::GenericFull, usedSid);
66
68
}
67
- diffAcl.AddAccess (NACLib::EAccessType::Allow, NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema, AppData ()->AllAuthenticatedUsers );
69
+
70
+ auto useAccess = NACLib::EAccessRights::SelectRow | NACLib::EAccessRights::DescribeSchema;
71
+ for (const auto & userSID : AppData ()->DefaultUserSIDs ) {
72
+ diffAcl.AddAccess (NACLib::EAccessType::Allow, useAccess, userSID);
73
+ }
74
+ diffAcl.AddAccess (NACLib::EAccessType::Allow, useAccess, AppData ()->AllAuthenticatedUsers );
75
+ diffAcl.AddAccess (NACLib::EAccessType::Allow, useAccess, BUILTIN_ACL_ROOT);
68
76
69
77
auto token = MakeIntrusive<NACLib::TUserToken>(BUILTIN_ACL_METADATA, TVector<NACLib::TSID>{});
70
78
Register (CreatePoolCreatorActor (SelfId (), Event->Get ()->Database , Event->Get ()->PoolId , NResourcePool::TPoolSettings (), token, diffAcl));
@@ -116,7 +124,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
116
124
117
125
class TPoolFetcherActor : public TSchemeActorBase <TPoolFetcherActor> {
118
126
public:
119
- TPoolFetcherActor (const NActors:: TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
127
+ TPoolFetcherActor (const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
120
128
: ReplyActorId(replyActorId)
121
129
, Database(database)
122
130
, PoolId(poolId)
@@ -255,38 +263,67 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
255
263
}
256
264
257
265
void Handle (TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev) {
258
- const auto ssStatus = ev->Get ()->Record .GetSchemeShardStatus ();
259
- switch (ev->Get ()->Status ()) {
266
+ const auto & response = ev->Get ()->Record ;
267
+ const auto ssStatus = response.GetSchemeShardStatus ();
268
+ const auto status = ev->Get ()->Status ();
269
+ switch (status) {
260
270
case NTxProxy::TResultStatus::ExecComplete:
261
271
case NTxProxy::TResultStatus::ExecAlready:
262
272
if (ssStatus == NKikimrScheme::EStatus::StatusSuccess || ssStatus == NKikimrScheme::EStatus::StatusAlreadyExists) {
263
273
Reply (Ydb::StatusIds::SUCCESS);
264
274
} else {
265
- Reply (Ydb::StatusIds::SCHEME_ERROR, TStringBuilder () << " Invalid creation status: " << static_cast <NKikimrScheme::EStatus>(ssStatus));
275
+ Reply (Ydb::StatusIds::SCHEME_ERROR, ExtractIssues (response, TStringBuilder () << " Invalid creation status: " << static_cast <NKikimrScheme::EStatus>(ssStatus) ));
266
276
}
267
277
return ;
268
278
case NTxProxy::TResultStatus::ExecError:
269
- if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications || ssStatus == NKikimrScheme::EStatus::StatusInvalidParameter ) {
270
- ScheduleRetry (ssStatus, " Retry execution error " , true );
279
+ if (ssStatus == NKikimrScheme::EStatus::StatusMultipleModifications) {
280
+ SubscribeOnTransactionOrRetry (status, response );
271
281
} else {
272
- Reply (Ydb::StatusIds::SCHEME_ERROR, TStringBuilder () << " Execution error: " << static_cast <NKikimrScheme::EStatus>(ssStatus));
282
+ Reply (Ydb::StatusIds::SCHEME_ERROR, ExtractIssues (response, TStringBuilder () << " Execution error: " << static_cast <NKikimrScheme::EStatus>(ssStatus) ));
273
283
}
274
284
return ;
275
285
case NTxProxy::TResultStatus::ExecInProgress:
276
- ScheduleRetry (ssStatus, " Retry execution in progress error " , true );
286
+ SubscribeOnTransactionOrRetry (status, response );
277
287
return ;
278
288
case NTxProxy::TResultStatus::ProxyShardNotAvailable:
279
- ScheduleRetry (ssStatus , " Retry shard unavailable error" );
289
+ ScheduleRetry (response , " Retry shard unavailable error" );
280
290
return ;
281
291
default :
282
- Reply (Ydb::StatusIds::SCHEME_ERROR, TStringBuilder () << " Failed to create resource pool: " << static_cast <NKikimrScheme::EStatus>(ssStatus));
292
+ Reply (Ydb::StatusIds::SCHEME_ERROR, ExtractIssues (response, TStringBuilder () << " Failed to create resource pool: " << static_cast <NKikimrScheme::EStatus>(ssStatus) ));
283
293
return ;
284
294
}
285
295
}
286
296
297
+ void Handle (TEvTabletPipe::TEvClientConnected::TPtr& ev) {
298
+ if (ev->Get ()->Status == NKikimrProto::OK) {
299
+ LOG_T (" Tablet to pipe successfully connected" );
300
+ return ;
301
+ }
302
+
303
+ ClosePipeClient ();
304
+ ScheduleRetry (TStringBuilder () << " Tablet to pipe not connected: " << NKikimrProto::EReplyStatus_Name (ev->Get ()->Status ));
305
+ }
306
+
307
+ void Handle (TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
308
+ const TActorId clientId = ev->Get ()->ClientId ;
309
+ if (!ClosedSchemePipeActors.contains (clientId)) {
310
+ ClosePipeClient ();
311
+ ScheduleRetry (" Tablet to pipe destroyed" );
312
+ }
313
+ }
314
+
315
+ void Handle (NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev) {
316
+ ScheduleRetry (TStringBuilder () << " Transaction " << ev->Get ()->Record .GetTxId () << " completed, doublechecking" );
317
+ }
318
+
287
319
STFUNC (StateFunc) {
288
320
switch (ev->GetTypeRewrite ()) {
289
321
hFunc (TEvTxUserProxy::TEvProposeTransactionStatus, Handle )
322
+ hFunc (TEvTabletPipe::TEvClientConnected, Handle )
323
+ hFunc (TEvTabletPipe::TEvClientDestroyed, Handle )
324
+ hFunc (NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult, Handle )
325
+ IgnoreFunc (NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionRegistered)
326
+
290
327
default :
291
328
StateFuncBase (ev);
292
329
}
@@ -301,13 +338,12 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
301
338
schemeTx.SetWorkingDir (JoinPath ({Database, " .resource_pools" }));
302
339
schemeTx.SetOperationType (NKikimrSchemeOp::ESchemeOpCreateResourcePool);
303
340
schemeTx.SetInternal (true );
304
- schemeTx.SetAllowAccessToPrivatePaths (true );
305
341
306
342
BuildCreatePoolRequest (*schemeTx.MutableCreateResourcePool ());
307
343
BuildModifyAclRequest (*schemeTx.MutableModifyACL ());
308
344
309
345
if (UserToken) {
310
- event->Record .SetUserToken (UserToken->GetSerializedToken ());
346
+ event->Record .SetUserToken (UserToken->SerializeAsString ());
311
347
}
312
348
313
349
Send (MakeTxProxyID (), std::move (event));
@@ -322,10 +358,42 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
322
358
}
323
359
324
360
private:
325
- void ScheduleRetry (ui32 status, const TString& message, bool longDelay = false ) {
326
- auto ssStatus = static_cast <NKikimrScheme::EStatus>(status);
327
- if (!TBase::ScheduleRetry (TStringBuilder () << message << " , status: " << ssStatus, longDelay)) {
328
- Reply (Ydb::StatusIds::UNAVAILABLE, TStringBuilder () << " Retry limit exceeded on status: " << ssStatus);
361
+ void SubscribeOnTransactionOrRetry (NTxProxy::TResultStatus::EStatus status, const NKikimrTxUserProxy::TEvProposeTransactionStatus& response) {
362
+ const ui64 txId = status == NTxProxy::TResultStatus::ExecInProgress ? response.GetTxId () : response.GetPathCreateTxId ();
363
+ if (txId == 0 ) {
364
+ ScheduleRetry (response, " Unable to subscribe to concurrent transaction" , true );
365
+ return ;
366
+ }
367
+
368
+ SchemePipeActorId = Register (NTabletPipe::CreateClient (SelfId (), response.GetSchemeShardTabletId ()));
369
+
370
+ auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
371
+ request->Record .SetTxId (txId);
372
+ NTabletPipe::SendData (SelfId (), SchemePipeActorId, std::move (request));
373
+ LOG_D (" Subscribe on create pool tx: " << txId);
374
+ }
375
+
376
+ void ClosePipeClient () {
377
+ if (SchemePipeActorId) {
378
+ ClosedSchemePipeActors.insert (SchemePipeActorId);
379
+ NTabletPipe::CloseClient (SelfId (), SchemePipeActorId);
380
+ SchemePipeActorId = {};
381
+ }
382
+ }
383
+
384
+ void ScheduleRetry (const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message, bool longDelay = false ) {
385
+ ClosePipeClient ();
386
+
387
+ auto ssStatus = static_cast <NKikimrScheme::EStatus>(response.GetSchemeShardStatus ());
388
+ if (!TBase::ScheduleRetry (ExtractIssues (response, TStringBuilder () << message << " , status: " << ssStatus), longDelay)) {
389
+ Reply (Ydb::StatusIds::UNAVAILABLE, ExtractIssues (response, TStringBuilder () << " Retry limit exceeded on status: " << ssStatus));
390
+ }
391
+ }
392
+
393
+ void ScheduleRetry (const TString& message, bool longDelay = false ) {
394
+ ClosePipeClient ();
395
+ if (!TBase::ScheduleRetry (message, longDelay)) {
396
+ Reply (Ydb::StatusIds::UNAVAILABLE, TStringBuilder () << " Retry limit exceeded on error: " << message);
329
397
}
330
398
}
331
399
@@ -358,18 +426,29 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
358
426
LOG_W (" Failed to create pool, " << status << " , issues: " << issues.ToOneLineString ());
359
427
}
360
428
429
+ ClosePipeClient ();
430
+
361
431
Issues.AddIssues (std::move (issues));
362
432
Send (ReplyActorId, new TEvPrivate::TEvCreatePoolResponse (status, std::move (Issues)));
363
433
PassAway ();
364
434
}
365
435
436
+ static NYql::TIssues ExtractIssues (const NKikimrTxUserProxy::TEvProposeTransactionStatus& response, const TString& message) {
437
+ NYql::TIssues issues;
438
+ NYql::IssuesFromMessage (response.GetIssues (), issues);
439
+ return GroupIssues (issues, message);
440
+ }
441
+
366
442
private:
367
443
const TActorId ReplyActorId;
368
444
const TString Database;
369
445
const TString PoolId;
370
446
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
371
447
const NACLibProto::TDiffACL DiffAcl;
372
448
NResourcePool::TPoolSettings PoolConfig;
449
+
450
+ std::unordered_set<TActorId> ClosedSchemePipeActors;
451
+ TActorId SchemePipeActorId;
373
452
};
374
453
375
454
} // anonymous namespace
0 commit comments