Skip to content

Commit 49f37ae

Browse files
authored
Merge 6c0b232 into 725b0df
2 parents 725b0df + 6c0b232 commit 49f37ae

13 files changed

+269
-42
lines changed

ydb/core/kqp/opt/kqp_query_plan.cpp

+31-2
Original file line numberDiff line numberDiff line change
@@ -2243,7 +2243,7 @@ TString AddSimplifiedPlan(const TString& planText, TIntrusivePtr<NOpt::TKqpOptim
22432243
return planJson.GetStringRobust();
22442244
}
22452245

2246-
TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "") {
2246+
TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "", const TString& queryStats = "") {
22472247
NJsonWriter::TBuf writer;
22482248
writer.SetIndentSpaces(2);
22492249

@@ -2266,6 +2266,15 @@ TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NO
22662266
writer.BeginObject();
22672267
writer.WriteKey("Node Type").WriteString("Query");
22682268
writer.WriteKey("PlanNodeType").WriteString("Query");
2269+
2270+
if (queryStats) {
2271+
NJson::TJsonValue queryStatsJson;
2272+
NJson::ReadJsonTree(queryStats, &queryStatsJson, true);
2273+
2274+
writer.WriteKey("Stats");
2275+
writer.WriteJsonValue(&queryStatsJson);
2276+
}
2277+
22692278
writer.WriteKey("Plans");
22702279
writer.BeginList();
22712280

@@ -2705,7 +2714,27 @@ TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
27052714
txPlans.push_back(txPlan);
27062715
}
27072716
}
2708-
return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>());
2717+
2718+
NJsonWriter::TBuf writer;
2719+
writer.BeginObject();
2720+
2721+
if (queryStats.HasCompilation()) {
2722+
const auto& compilation = queryStats.GetCompilation();
2723+
2724+
writer.WriteKey("Compilation");
2725+
writer.BeginObject();
2726+
writer.WriteKey("FromCache").WriteBool(compilation.GetFromCache());
2727+
writer.WriteKey("DurationUs").WriteLongLong(compilation.GetDurationUs());
2728+
writer.WriteKey("CpuTimeUs").WriteLongLong(compilation.GetCpuTimeUs());
2729+
writer.EndObject();
2730+
}
2731+
2732+
writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs());
2733+
writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs());
2734+
writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
2735+
writer.EndObject();
2736+
2737+
return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>(), "", writer.Str());
27092738
}
27102739

27112740
TString SerializeScriptPlan(const TVector<const TString>& queryPlans) {

ydb/core/kqp/session_actor/kqp_query_state.h

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class TKqpQueryState : public TNonCopyable {
114114
bool IsDocumentApiRestricted_ = false;
115115

116116
TInstant StartTime;
117+
TInstant ContinueTime;
117118
NYql::TKikimrQueryDeadlines QueryDeadlines;
118119
TKqpQueryStats QueryStats;
119120
bool KeepSession = false;

ydb/core/kqp/session_actor/kqp_query_stats.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ ui64 CalcRequestUnit(const TKqpQueryStats& stats) {
210210
NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const {
211211
NKqpProto::TKqpStatsQuery result;
212212
result.SetDurationUs(DurationUs);
213+
result.SetQueuedTimeUs(QueuedTimeUs);
213214

214215
if (Compilation) {
215216
result.MutableCompilation()->SetFromCache(Compilation->FromCache);

ydb/core/kqp/session_actor/kqp_query_stats.h

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace NKikimr::NKqp {
88

99
struct TKqpQueryStats {
1010
ui64 DurationUs = 0;
11+
ui64 QueuedTimeUs = 0;
1112
std::optional<TKqpStatsCompile> Compilation;
1213

1314
ui64 WorkerCpuTimeUs = 0;

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
475475

476476
void Handle(NWorkload::TEvContinueRequest::TPtr& ev) {
477477
YQL_ENSURE(QueryState);
478+
QueryState->ContinueTime = TInstant::Now();
478479

479480
if (ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED) {
480481
LOG_T("Failed to place request in resource pool, feature flag is disabled");
@@ -1552,6 +1553,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
15521553

15531554
stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds());
15541555
stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds());
1556+
if (const auto continueTime = QueryState->ContinueTime) {
1557+
stats->QueuedTimeUs = (continueTime - QueryState->StartTime).MicroSeconds();
1558+
}
15551559
if (QueryState->CompileResult) {
15561560
stats->Compilation.emplace();
15571561
stats->Compilation->FromCache = (QueryState->CompileStats.FromCache);

ydb/core/kqp/workload_service/actors/actors.h

+5-2
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@ namespace NKikimr::NKqp::NWorkload {
99
NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);
1010

1111
// Fetch pool and create default pool if needed
12-
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless);
12+
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists);
1313

1414
// Fetch and create pool in scheme shard
15-
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless);
15+
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken);
1616
NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl);
1717

18+
// Checks that database is serverless
19+
NActors::IActor* CreateDatabaseFetcherActor(const NActors::TActorId& replyActorId, const TString& database);
20+
1821
// Cpu load fetcher actor
1922
NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId);
2023

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

+10-2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
5454
UpdateConfigCounters(poolConfig);
5555
}
5656

57+
void CollectRequestLatency(TInstant continueTime) {
58+
if (continueTime) {
59+
RequestsLatencyMs->Collect((TInstant::Now() - continueTime).MilliSeconds());
60+
}
61+
}
62+
5763
void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) {
5864
InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0));
5965
QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0));
@@ -106,6 +112,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
106112
const TActorId WorkerActorId;
107113
const TString SessionId;
108114
const TInstant StartTime = TInstant::Now();
115+
TInstant ContinueTime;
109116

110117
EState State = EState::Pending;
111118
bool Started = false; // after TEvContinueRequest success
@@ -267,6 +274,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
267274
if (status == Ydb::StatusIds::SUCCESS) {
268275
LocalInFlight++;
269276
request->Started = true;
277+
request->ContinueTime = TInstant::Now();
270278
Counters.LocalInFly->Inc();
271279
Counters.ContinueOk->Inc();
272280
Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
@@ -387,7 +395,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
387395

388396
if (status == Ydb::StatusIds::SUCCESS) {
389397
Counters.CleanupOk->Inc();
390-
Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
398+
Counters.CollectRequestLatency(request->ContinueTime);
391399
LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
392400
} else {
393401
Counters.CleanupError->Inc();
@@ -401,7 +409,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
401409
this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release());
402410

403411
Counters.Cancelled->Inc();
404-
Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
412+
Counters.CollectRequestLatency(request->ContinueTime);
405413
LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
406414
}
407415

ydb/core/kqp/workload_service/actors/scheme_actors.cpp

+102-16
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ using namespace NActors;
2222

2323
class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
2424
public:
25-
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless)
25+
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists)
2626
: Event(std::move(event))
27-
, EnableOnServerless(enableOnServerless)
2827
{
2928
if (!Event->Get()->PoolId) {
3029
Event->Get()->PoolId = NResourcePool::DEFAULT_POOL_ID;
@@ -39,7 +38,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
3938

4039
void StartPoolFetchRequest() const {
4140
LOG_D("Start pool fetching");
42-
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken, EnableOnServerless));
41+
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken));
4342
}
4443

4544
void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) {
@@ -116,20 +115,18 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
116115

117116
private:
118117
TEvPlaceRequestIntoPool::TPtr Event;
119-
const bool EnableOnServerless;
120118
bool CanCreatePool = false;
121119
bool DefaultPoolCreated = false;
122120
};
123121

124122

125123
class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
126124
public:
127-
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
125+
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
128126
: ReplyActorId(replyActorId)
129127
, Database(database)
130128
, PoolId(poolId)
131129
, UserToken(userToken)
132-
, EnableOnServerless(enableOnServerless)
133130
{}
134131

135132
void DoBootstrap() {
@@ -144,11 +141,6 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
144141
}
145142

146143
const auto& result = results[0];
147-
if (!EnableOnServerless && result.DomainInfo && result.DomainInfo->IsServerless()) {
148-
Reply(Ydb::StatusIds::UNSUPPORTED, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
149-
return;
150-
}
151-
152144
switch (result.Status) {
153145
case EStatus::Unknown:
154146
case EStatus::PathNotTable:
@@ -238,7 +230,6 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
238230
const TString Database;
239231
const TString PoolId;
240232
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
241-
const bool EnableOnServerless;
242233

243234
NResourcePool::TPoolSettings PoolConfig;
244235
NKikimrProto::TPathID PathId;
@@ -451,18 +442,113 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
451442
TActorId SchemePipeActorId;
452443
};
453444

445+
446+
class TDatabaseFetcherActor : public TSchemeActorBase<TDatabaseFetcherActor> {
447+
public:
448+
TDatabaseFetcherActor(const TActorId& replyActorId, const TString& database)
449+
: ReplyActorId(replyActorId)
450+
, Database(database)
451+
{}
452+
453+
void DoBootstrap() {
454+
Become(&TDatabaseFetcherActor::StateFunc);
455+
}
456+
457+
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
458+
const auto& results = ev->Get()->Request->ResultSet;
459+
if (results.size() != 1) {
460+
Reply(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected scheme cache response");
461+
return;
462+
}
463+
464+
const auto& result = results[0];
465+
switch (result.Status) {
466+
case EStatus::Unknown:
467+
case EStatus::PathNotTable:
468+
case EStatus::PathNotPath:
469+
case EStatus::RedirectLookupError:
470+
case EStatus::AccessDenied:
471+
case EStatus::RootUnknown:
472+
case EStatus::PathErrorUnknown:
473+
Reply(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Database " << Database << " not found or you don't have access permissions");
474+
return;
475+
case EStatus::LookupError:
476+
case EStatus::TableCreationNotComplete:
477+
if (!ScheduleRetry(TStringBuilder() << "Retry error " << result.Status)) {
478+
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on scheme error: " << result.Status);
479+
}
480+
return;
481+
case EStatus::Ok:
482+
Serverless = result.DomainInfo && result.DomainInfo->IsServerless();
483+
Reply(Ydb::StatusIds::SUCCESS);
484+
return;
485+
}
486+
}
487+
488+
STFUNC(StateFunc) {
489+
switch (ev->GetTypeRewrite()) {
490+
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
491+
default:
492+
StateFuncBase(ev);
493+
}
494+
}
495+
496+
protected:
497+
void StartRequest() override {
498+
LOG_D("Start database fetching");
499+
auto event = NTableCreator::BuildSchemeCacheNavigateRequest({{}}, Database, nullptr);
500+
event->ResultSet[0].Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
501+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(event.Release()), IEventHandle::FlagTrackDelivery);
502+
}
503+
504+
void OnFatalError(Ydb::StatusIds::StatusCode status, NYql::TIssue issue) override {
505+
Reply(status, {std::move(issue)});
506+
}
507+
508+
TString LogPrefix() const override {
509+
return TStringBuilder() << "[TDatabaseFetcherActor] ActorId: " << SelfId() << ", Database: " << Database << ", ";
510+
}
511+
512+
private:
513+
void Reply(Ydb::StatusIds::StatusCode status, const TString& message) {
514+
Reply(status, {NYql::TIssue(message)});
515+
}
516+
517+
void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
518+
if (status == Ydb::StatusIds::SUCCESS) {
519+
LOG_D("Database info successfully fetched");
520+
} else {
521+
LOG_W("Failed to fetch database info, " << status << ", issues: " << issues.ToOneLineString());
522+
}
523+
524+
Issues.AddIssues(std::move(issues));
525+
Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, std::move(Issues)));
526+
PassAway();
527+
}
528+
529+
private:
530+
const TActorId ReplyActorId;
531+
const TString Database;
532+
533+
bool Serverless = false;
534+
};
535+
454536
} // anonymous namespace
455537

456-
IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) {
457-
return new TPoolResolverActor(std::move(event), defaultPoolExists, enableOnServerless);
538+
IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) {
539+
return new TPoolResolverActor(std::move(event), defaultPoolExists);
458540
}
459541

460-
IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless) {
461-
return new TPoolFetcherActor(replyActorId, database, poolId, userToken, enableOnServerless);
542+
IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
543+
return new TPoolFetcherActor(replyActorId, database, poolId, userToken);
462544
}
463545

464546
IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl) {
465547
return new TPoolCreatorActor(replyActorId, database, poolId, poolConfig, userToken, diffAcl);
466548
}
467549

550+
IActor* CreateDatabaseFetcherActor(const TActorId& replyActorId, const TString& database) {
551+
return new TDatabaseFetcherActor(replyActorId, database);
552+
}
553+
468554
} // NKikimr::NKqp::NWorkload

ydb/core/kqp/workload_service/common/events.h

+15
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ struct TEvPrivate {
2222
EvRefreshPoolState = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
2323
EvResolvePoolResponse,
2424
EvFetchPoolResponse,
25+
EvFetchDatabaseResponse,
2526
EvCreatePoolResponse,
2627
EvPrepareTablesRequest,
2728
EvPlaceRequestIntoPoolResponse,
@@ -85,6 +86,20 @@ struct TEvPrivate {
8586
const NYql::TIssues Issues;
8687
};
8788

89+
struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, EvFetchDatabaseResponse> {
90+
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, NYql::TIssues issues)
91+
: Status(status)
92+
, Database(database)
93+
, Serverless(serverless)
94+
, Issues(std::move(issues))
95+
{}
96+
97+
const Ydb::StatusIds::StatusCode Status;
98+
const TString Database;
99+
const bool Serverless;
100+
const NYql::TIssues Issues;
101+
};
102+
88103
struct TEvCreatePoolResponse : public NActors::TEventLocal<TEvCreatePoolResponse, EvCreatePoolResponse> {
89104
TEvCreatePoolResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
90105
: Status(status)

0 commit comments

Comments
 (0)