Skip to content

Commit 09744cf

Browse files
authored
sys view for resource pool classifiers has been supported YQ-3792 (#14287)
1 parent 68fb393 commit 09744cf

20 files changed

+571
-27
lines changed

ydb/core/grpc_services/rpc_read_columns.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,9 @@ class TReadColumnsRPC : public TActorBootstrapped<TReadColumnsRPC> {
321321
JoinPath(ResolveNamesResult->ResultSet.front().Path),
322322
range,
323323
columns,
324-
Request->GetInternalToken());
324+
Request->GetInternalToken(),
325+
Request->GetDatabaseName().GetOrElse({}),
326+
false);
325327

326328
if (!tableScanActor) {
327329
return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,

ydb/core/kqp/compute_actor/kqp_compute_actor.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
5151
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
5252
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
5353
TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode,
54-
TIntrusiveConstPtr<NACLib::TUserToken> userToken);
54+
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
55+
const TString& database);
5556

5657
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask* task,
5758
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings,

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
226226
}
227227
IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory,
228228
runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings,
229-
std::move(args.SchedulingOptions), args.BlockTrackingMode, std::move(args.UserToken));
229+
std::move(args.SchedulingOptions), args.BlockTrackingMode, std::move(args.UserToken), args.Database);
230230
return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) :
231231
TlsActivationContext->AsActorContext().Register(computeActor);
232232
}

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ struct IKqpNodeComputeActorFactory {
131131
std::shared_ptr<IKqpNodeState> State = nullptr;
132132
TComputeActorSchedulingOptions SchedulingOptions = {};
133133
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
134+
TString Database;
134135
};
135136

136137
typedef std::variant<TActorId, NKikimr::NKqp::NRm::TKqpRMAllocateResult> TActorStartResult;

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro
1616
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
1717
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
1818
TComputeActorSchedulingOptions schedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode,
19-
TIntrusiveConstPtr<NACLib::TUserToken> userToken)
19+
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
20+
const TString& database)
2021
: TBase(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
2122
, ComputeCtx(settings.StatsMode)
2223
, FederatedQuerySetup(federatedQuerySetup)
2324
, BlockTrackingMode(mode)
2425
, ArrayBufferMinFillPercentage(memoryLimits.ArrayBufferMinFillPercentage)
2526
, UserToken(std::move(userToken))
27+
, Database(database)
2628
{
2729
InitializeTask();
2830
if (GetTask().GetMeta().Is<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta>()) {
@@ -89,6 +91,7 @@ void TKqpComputeActor::DoBootstrap() {
8991
TSmallVec<NMiniKQL::TKqpScanComputeContext::TColumn> columns;
9092

9193
TVector<TSerializedTableRange> ranges;
94+
bool reverse = false;
9295
if (Meta) {
9396
YQL_ENSURE(ComputeCtx.GetTableScans().empty());
9497

@@ -107,13 +110,14 @@ void TKqpComputeActor::DoBootstrap() {
107110
for (auto& range : protoRanges) {
108111
ranges.emplace_back(range);
109112
}
113+
reverse = Meta->GetReverse();
110114
}
111115

112116
if (ScanData) {
113117
ScanData->TaskId = GetTask().GetId();
114118
ScanData->TableReader = CreateKqpTableReader(*ScanData);
115119

116-
auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, ScanData->TablePath, ranges, columns, UserToken);
120+
auto scanActor = NSysView::CreateSystemViewScan(SelfId(), 0, ScanData->TableId, ScanData->TablePath, ranges, columns, UserToken, Database, reverse);
117121

118122
if (!scanActor) {
119123
InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder()
@@ -289,10 +293,11 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T
289293
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
290294
const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions cpuOptions,
291295
NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode,
292-
TIntrusiveConstPtr<NACLib::TUserToken> userToken)
296+
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
297+
const TString& database)
293298
{
294299
return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory),
295-
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, std::move(cpuOptions), mode, std::move(userToken));
300+
settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, std::move(cpuOptions), mode, std::move(userToken), database);
296301
}
297302

298303
} // namespace NKqp

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ class TKqpComputeActor : public TSchedulableComputeActorBase<TKqpComputeActor> {
3030
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
3131
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
3232
TComputeActorSchedulingOptions, NKikimrConfig::TTableServiceConfig::EBlockTrackingMode mode,
33-
TIntrusiveConstPtr<NACLib::TUserToken> userToken);
33+
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
34+
const TString& database);
3435

3536
void DoBootstrap();
3637

@@ -66,6 +67,7 @@ class TKqpComputeActor : public TSchedulableComputeActorBase<TKqpComputeActor> {
6667
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
6768
const TMaybe<ui8> ArrayBufferMinFillPercentage;
6869
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
70+
const TString Database;
6971
};
7072

7173
} // namespace NKqp

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -938,9 +938,10 @@ class TKqpExecuterBase : public TActor<TDerived> {
938938
.Columns = BuildKqpColumns(op, tableInfo),
939939
};
940940

941+
auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv());
941942
task.Meta.Reads.ConstructInPlace();
942943
task.Meta.Reads->emplace_back(std::move(readInfo));
943-
task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse();
944+
task.Meta.ReadInfo.Reverse = readSettings.Reverse;
944945
task.Meta.Type = TTaskMeta::TTaskType::Compute;
945946

946947
FillSecureParamsFromStage(task.Meta.SecureParams, stage);

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,8 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
501501
.ShareMailbox = (computeTasksSize <= 1),
502502
.RlPath = Nothing(),
503503
.BlockTrackingMode = BlockTrackingMode,
504-
.UserToken = UserToken
504+
.UserToken = UserToken,
505+
.Database = Database
505506
});
506507

507508
if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&startResult)) {

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
286286
createArgs.UserToken.Reset(MakeIntrusive<NACLib::TUserToken>(msg.GetUserToken()));
287287
}
288288

289+
createArgs.Database = msg.GetDatabase();
290+
289291
auto result = CaFactory_->CreateKqpComputeActor(std::move(createArgs));
290292

291293
if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&result)) {

0 commit comments

Comments
 (0)