Skip to content

Commit 2f21060

Browse files
authored
WM fixed bugs and performance (#7254)
1 parent 900ca74 commit 2f21060

21 files changed

+428
-51
lines changed

ydb/core/kqp/common/events/query.h

+10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#pragma once
2+
#include <ydb/core/resource_pools/resource_pool_settings.h>
23
#include <ydb/core/protos/kqp.pb.h>
34
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
45
#include <ydb/core/kqp/common/kqp_user_request_context.h>
@@ -342,6 +343,14 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
342343
return Record.GetRequest().GetPoolId();
343344
}
344345

346+
void SetPoolConfig(const NResourcePool::TPoolSettings& config) {
347+
PoolConfig = config;
348+
}
349+
350+
std::optional<NResourcePool::TPoolSettings> GetPoolConfig() const {
351+
return PoolConfig;
352+
}
353+
345354
mutable NKikimrKqp::TEvQueryRequest Record;
346355

347356
private:
@@ -370,6 +379,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
370379
TDuration CancelAfter;
371380
TIntrusivePtr<TUserRequestContext> UserRequestContext;
372381
TDuration ProgressStatsPeriod;
382+
std::optional<NResourcePool::TPoolSettings> PoolConfig;
373383
};
374384

375385
struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,

ydb/core/kqp/common/events/workload_service.h

+14
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,18 @@ struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqp
6666
const NYql::TIssues Issues;
6767
};
6868

69+
struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWorkloadServiceEvents::EvUpdatePoolInfo> {
70+
TEvUpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
71+
: Database(database)
72+
, PoolId(poolId)
73+
, Config(config)
74+
, SecurityObject(securityObject)
75+
{}
76+
77+
const TString Database;
78+
const TString PoolId;
79+
const std::optional<NResourcePool::TPoolSettings> Config;
80+
const std::optional<NACLib::TSecurityObject> SecurityObject;
81+
};
82+
6983
} // NKikimr::NKqp::NWorkload

ydb/core/kqp/common/kqp_user_request_context.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace NKikimr::NKqp {
1515
TString CurrentExecutionId;
1616
TString CustomerSuppliedId;
1717
TString PoolId;
18-
NResourcePool::TPoolSettings PoolConfig;
18+
std::optional<NResourcePool::TPoolSettings> PoolConfig;
1919

2020
TUserRequestContext() = default;
2121

ydb/core/kqp/common/simple/kqp_event_ids.h

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ struct TKqpWorkloadServiceEvents {
174174
EvContinueRequest,
175175
EvCleanupRequest,
176176
EvCleanupResponse,
177+
EvUpdatePoolInfo,
177178
};
178179
};
179180

ydb/core/kqp/opt/kqp_query_plan.cpp

+31-2
Original file line numberDiff line numberDiff line change
@@ -2243,7 +2243,7 @@ TString AddSimplifiedPlan(const TString& planText, TIntrusivePtr<NOpt::TKqpOptim
22432243
return planJson.GetStringRobust();
22442244
}
22452245

2246-
TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "") {
2246+
TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "", const TString& queryStats = "") {
22472247
NJsonWriter::TBuf writer;
22482248
writer.SetIndentSpaces(2);
22492249

@@ -2266,6 +2266,15 @@ TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NO
22662266
writer.BeginObject();
22672267
writer.WriteKey("Node Type").WriteString("Query");
22682268
writer.WriteKey("PlanNodeType").WriteString("Query");
2269+
2270+
if (queryStats) {
2271+
NJson::TJsonValue queryStatsJson;
2272+
NJson::ReadJsonTree(queryStats, &queryStatsJson, true);
2273+
2274+
writer.WriteKey("Stats");
2275+
writer.WriteJsonValue(&queryStatsJson);
2276+
}
2277+
22692278
writer.WriteKey("Plans");
22702279
writer.BeginList();
22712280

@@ -2705,7 +2714,27 @@ TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
27052714
txPlans.push_back(txPlan);
27062715
}
27072716
}
2708-
return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>());
2717+
2718+
NJsonWriter::TBuf writer;
2719+
writer.BeginObject();
2720+
2721+
if (queryStats.HasCompilation()) {
2722+
const auto& compilation = queryStats.GetCompilation();
2723+
2724+
writer.WriteKey("Compilation");
2725+
writer.BeginObject();
2726+
writer.WriteKey("FromCache").WriteBool(compilation.GetFromCache());
2727+
writer.WriteKey("DurationUs").WriteLongLong(compilation.GetDurationUs());
2728+
writer.WriteKey("CpuTimeUs").WriteLongLong(compilation.GetCpuTimeUs());
2729+
writer.EndObject();
2730+
}
2731+
2732+
writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs());
2733+
writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs());
2734+
writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
2735+
writer.EndObject();
2736+
2737+
return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>(), "", writer.Str());
27092738
}
27102739

27112740
TString SerializeScriptPlan(const TVector<const TString>& queryPlans) {

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

+47-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <ydb/core/cms/console/console.h>
1212
#include <ydb/core/kqp/counters/kqp_counters.h>
1313
#include <ydb/core/kqp/common/events/script_executions.h>
14+
#include <ydb/core/kqp/common/events/workload_service.h>
1415
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
1516
#include <ydb/core/kqp/common/kqp_timeouts.h>
1617
#include <ydb/core/kqp/compile_service/kqp_compile_service.h>
@@ -691,11 +692,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
691692
LocalSessions->AttachQueryText(sessionInfo, ev->Get()->GetQuery());
692693
}
693694

694-
if (!FeatureFlags.GetEnableResourcePools()) {
695-
ev->Get()->SetPoolId("");
696-
} else if (!ev->Get()->GetPoolId()) {
697-
// TODO: do not use default pool if there is no limits
698-
ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID);
695+
if (!TryFillPoolInfoFromCache(ev, requestId)) {
696+
return;
699697
}
700698

701699
TActorId targetId;
@@ -1348,6 +1346,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
13481346
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
13491347
hFunc(TEvKqp::TEvListSessionsRequest, Handle);
13501348
hFunc(TEvKqp::TEvListProxyNodesRequest, Handle);
1349+
hFunc(NWorkload::TEvUpdatePoolInfo, Handle);
13511350
default:
13521351
Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
13531352
ev->GetTypeRewrite(), ev->ToString().data());
@@ -1570,6 +1569,43 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
15701569
}
15711570
}
15721571

1572+
bool TryFillPoolInfoFromCache(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 requestId) {
1573+
if (!FeatureFlags.GetEnableResourcePools()) {
1574+
ev->Get()->SetPoolId("");
1575+
return true;
1576+
}
1577+
1578+
if (!ev->Get()->GetPoolId()) {
1579+
ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID);
1580+
}
1581+
1582+
const auto& poolId = ev->Get()->GetPoolId();
1583+
const auto& poolInfo = ResourcePoolsCache.GetPoolInfo(ev->Get()->GetDatabase(), poolId);
1584+
if (!poolInfo) {
1585+
return true;
1586+
}
1587+
1588+
const auto& securityObject = poolInfo->SecurityObject;
1589+
const auto& userToken = ev->Get()->GetUserToken();
1590+
if (securityObject && userToken && !userToken->GetSerializedToken().empty()) {
1591+
if (!securityObject->CheckAccess(NACLib::EAccessRights::DescribeSchema, *userToken)) {
1592+
ReplyProcessError(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Resource pool " << poolId << " not found or you don't have access permissions", requestId);
1593+
return false;
1594+
}
1595+
if (!securityObject->CheckAccess(NACLib::EAccessRights::SelectRow, *userToken)) {
1596+
ReplyProcessError(Ydb::StatusIds::UNAUTHORIZED, TStringBuilder() << "You don't have access permissions for resource pool " << poolId, requestId);
1597+
return false;
1598+
}
1599+
}
1600+
1601+
const auto& poolConfig = poolInfo->Config;
1602+
if (!NWorkload::IsWorkloadServiceRequired(poolConfig)) {
1603+
ev->Get()->SetPoolConfig(poolConfig);
1604+
}
1605+
1606+
return true;
1607+
}
1608+
15731609
void UpdateYqlLogLevels() {
15741610
const auto& kqpYqlName = NKikimrServices::EServiceKikimr_Name(NKikimrServices::KQP_YQL);
15751611
for (auto &entry : LogConfig.GetEntry()) {
@@ -1755,6 +1791,10 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
17551791
Send(ev->Sender, result.release(), 0, ev->Cookie);
17561792
}
17571793

1794+
void Handle(NWorkload::TEvUpdatePoolInfo::TPtr& ev) {
1795+
ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject);
1796+
}
1797+
17581798
private:
17591799
NKikimrConfig::TLogConfig LogConfig;
17601800
NKikimrConfig::TTableServiceConfig TableServiceConfig;
@@ -1816,6 +1856,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
18161856
std::deque<TDelayedEvent> DelayedEventsQueue;
18171857
bool IsLookupByRmScheduled = false;
18181858
TActorId KqpTempTablesAgentActor;
1859+
1860+
TResourcePoolsCache ResourcePoolsCache;
18191861
};
18201862

18211863
} // namespace

ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h

+37
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <ydb/core/base/appdata.h>
4+
#include <ydb/core/base/path.h>
45
#include <ydb/core/kqp/common/kqp.h>
56
#include <ydb/core/kqp/counters/kqp_counters.h>
67
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
@@ -415,4 +416,40 @@ class TLocalSessionsRegistry {
415416
}
416417
};
417418

419+
class TResourcePoolsCache {
420+
struct TPoolInfo {
421+
NResourcePool::TPoolSettings Config;
422+
std::optional<NACLib::TSecurityObject> SecurityObject;
423+
};
424+
425+
public:
426+
std::optional<TPoolInfo> GetPoolInfo(const TString& database, const TString& poolId) const {
427+
auto it = PoolsCache.find(GetPoolKey(database, poolId));
428+
if (it == PoolsCache.end()) {
429+
return std::nullopt;
430+
}
431+
return it->second;
432+
}
433+
434+
void UpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject) {
435+
const TString& poolKey = GetPoolKey(database, poolId);
436+
if (!config) {
437+
PoolsCache.erase(poolKey);
438+
return;
439+
}
440+
441+
auto& poolInfo = PoolsCache[poolKey];
442+
poolInfo.Config = *config;
443+
poolInfo.SecurityObject = securityObject;
444+
}
445+
446+
private:
447+
static TString GetPoolKey(const TString& database, const TString& poolId) {
448+
return CanonizePath(TStringBuilder() << database << "/" << poolId);
449+
}
450+
451+
private:
452+
std::unordered_map<TString, TPoolInfo> PoolsCache;
453+
};
454+
418455
} // namespace NKikimr::NKqp

ydb/core/kqp/session_actor/kqp_query_state.h

+2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class TKqpQueryState : public TNonCopyable {
8585
UserRequestContext = MakeIntrusive<TUserRequestContext>(RequestEv->GetTraceId(), Database, sessionId);
8686
}
8787
UserRequestContext->PoolId = RequestEv->GetPoolId();
88+
UserRequestContext->PoolConfig = RequestEv->GetPoolConfig();
8889
}
8990

9091
// the monotonously growing counter, the ordinal number of the query,
@@ -114,6 +115,7 @@ class TKqpQueryState : public TNonCopyable {
114115
bool IsDocumentApiRestricted_ = false;
115116

116117
TInstant StartTime;
118+
TInstant ContinueTime;
117119
NYql::TKikimrQueryDeadlines QueryDeadlines;
118120
TKqpQueryStats QueryStats;
119121
bool KeepSession = false;

ydb/core/kqp/session_actor/kqp_query_stats.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ ui64 CalcRequestUnit(const TKqpQueryStats& stats) {
210210
NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const {
211211
NKqpProto::TKqpStatsQuery result;
212212
result.SetDurationUs(DurationUs);
213+
result.SetQueuedTimeUs(QueuedTimeUs);
213214

214215
if (Compilation) {
215216
result.MutableCompilation()->SetFromCache(Compilation->FromCache);

ydb/core/kqp/session_actor/kqp_query_stats.h

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace NKikimr::NKqp {
88

99
struct TKqpQueryStats {
1010
ui64 DurationUs = 0;
11+
ui64 QueuedTimeUs = 0;
1112
std::optional<TKqpStatsCompile> Compilation;
1213

1314
ui64 WorkerCpuTimeUs = 0;

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
242242
}
243243

244244
void PassRequestToResourcePool() {
245+
if (QueryState->UserRequestContext->PoolConfig) {
246+
LOG_D("request placed into pool from cache: " << QueryState->UserRequestContext->PoolId);
247+
CompileQuery();
248+
return;
249+
}
250+
245251
Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvPlaceRequestIntoPool(
246252
QueryState->Database,
247253
SessionId,
@@ -475,6 +481,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
475481

476482
void Handle(NWorkload::TEvContinueRequest::TPtr& ev) {
477483
YQL_ENSURE(QueryState);
484+
QueryState->ContinueTime = TInstant::Now();
478485

479486
if (ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED) {
480487
LOG_T("Failed to place request in resource pool, feature flag is disabled");
@@ -1552,6 +1559,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
15521559

15531560
stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds());
15541561
stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds());
1562+
if (const auto continueTime = QueryState->ContinueTime) {
1563+
stats->QueuedTimeUs = (continueTime - QueryState->StartTime).MicroSeconds();
1564+
}
15551565
if (QueryState->CompileResult) {
15561566
stats->Compilation.emplace();
15571567
stats->Compilation->FromCache = (QueryState->CompileStats.FromCache);

ydb/core/kqp/workload_service/actors/actors.h

+5-2
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@ namespace NKikimr::NKqp::NWorkload {
99
NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);
1010

1111
// Fetch pool and create default pool if needed
12-
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless);
12+
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists);
1313

1414
// Fetch and create pool in scheme shard
15-
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless);
15+
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken);
1616
NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl);
1717

18+
// Checks that database is serverless
19+
NActors::IActor* CreateDatabaseFetcherActor(const NActors::TActorId& replyActorId, const TString& database);
20+
1821
// Cpu load fetcher actor
1922
NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId);
2023

0 commit comments

Comments
 (0)