Skip to content

Commit a196e8b

Browse files
authored
Session actor perf (#724)
1 parent 74f9996 commit a196e8b

17 files changed

+458
-275
lines changed

ydb/core/kqp/common/compilation/events.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::Ev
9191
}
9292

9393
TKqpCompileResult::TConstPtr CompileResult;
94-
NKqpProto::TKqpStatsCompile Stats;
94+
TKqpStatsCompile Stats;
9595
std::optional<TString> ReplayMessage;
9696
std::optional<TString> ReplayMessageUserView;
9797

ydb/core/kqp/common/compilation/result.h

+6
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,10 @@ struct TKqpCompileResult {
4141

4242
std::shared_ptr<const TPreparedQueryHolder> PreparedQuery;
4343
};
44+
45+
struct TKqpStatsCompile {
46+
bool FromCache = false;
47+
ui64 DurationUs = 0;
48+
ui64 CpuTimeUs = 0;
49+
};
4450
} // namespace NKikimr::NKqp

ydb/core/kqp/common/kqp_ru_calc.cpp

+17-55
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
#include "kqp_ru_calc.h"
22

3-
#include <ydb/core/protos/kqp_stats.pb.h>
4-
5-
#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>
6-
73
#include <ydb/core/kqp/common/kqp.h>
84

95
#include <util/generic/size_literals.h>
@@ -14,8 +10,6 @@ namespace NKikimr {
1410
namespace NKqp {
1511
namespace NRuCalc {
1612

17-
namespace {
18-
1913
ui64 CalcReadIORu(const TTableStat& stat) {
2014
constexpr ui64 bytesPerUnit = 4_KB;
2115
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;
@@ -24,65 +18,33 @@ ui64 CalcReadIORu(const TTableStat& stat) {
2418
return std::max(bytes, stat.Rows);
2519
}
2620

27-
class TIoReadStat: public TTableStat {
28-
public:
29-
void Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
30-
Rows += tableAggr.GetReadRows();
31-
Bytes += tableAggr.GetReadBytes();
32-
}
33-
34-
ui64 CalcRu() const {
35-
return CalcReadIORu(*this);
36-
}
37-
};
21+
void TIoReadStat::Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
22+
Rows += tableAggr.GetReadRows();
23+
Bytes += tableAggr.GetReadBytes();
24+
}
3825

39-
class TIoWriteStat: public TTableStat {
40-
public:
41-
void Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
42-
Rows += tableAggr.GetWriteRows();
43-
Rows += tableAggr.GetEraseRows();
44-
Bytes += tableAggr.GetWriteBytes();
45-
}
26+
ui64 TIoReadStat::CalcRu() const {
27+
return CalcReadIORu(*this);
28+
}
4629

47-
ui64 CalcRu() const {
48-
constexpr ui64 bytesPerUnit = 1_KB;
49-
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;
30+
void TIoWriteStat::Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
31+
Rows += tableAggr.GetWriteRows();
32+
Rows += tableAggr.GetEraseRows();
33+
Bytes += tableAggr.GetWriteBytes();
34+
}
5035

51-
auto bytes = (Bytes + bytesPerUnitAdjust) / bytesPerUnit;
52-
return 2 * std::max(bytes, Rows);
53-
}
54-
};
36+
ui64 TIoWriteStat::CalcRu() const {
37+
constexpr ui64 bytesPerUnit = 1_KB;
38+
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;
5539

40+
auto bytes = (Bytes + bytesPerUnitAdjust) / bytesPerUnit;
41+
return 2 * std::max(bytes, Rows);
5642
}
5743

5844
ui64 CpuTimeToUnit(TDuration cpuTime) {
5945
return std::floor(cpuTime.MicroSeconds() / 1500.0);
6046
}
6147

62-
ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats) {
63-
TDuration totalCpuTime;
64-
TIoReadStat totalReadStat;
65-
TIoWriteStat totalWriteStat;
66-
67-
for (const auto& exec : stats.GetExecutions()) {
68-
totalCpuTime += TDuration::MicroSeconds(exec.GetCpuTimeUs());
69-
70-
for (auto& table : exec.GetTables()) {
71-
totalReadStat.Add(table);
72-
}
73-
}
74-
75-
if (stats.HasCompilation()) {
76-
totalCpuTime += TDuration::MicroSeconds(stats.GetCompilation().GetCpuTimeUs());
77-
}
78-
79-
totalCpuTime += TDuration::MicroSeconds(stats.GetWorkerCpuTimeUs());
80-
81-
auto totalIoRu = totalReadStat.CalcRu() + totalWriteStat.CalcRu();
82-
83-
return std::max(std::max(CpuTimeToUnit(totalCpuTime), totalIoRu), (ui64)1);
84-
}
85-
8648
ui64 CalcRequestUnit(const TProgressStatEntry& stats) {
8749
auto ioRu = CalcReadIORu(stats.ReadIOStat);
8850

ydb/core/kqp/common/kqp_ru_calc.h

+17-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
#include <util/system/types.h>
44
#include <util/datetime/base.h>
55

6+
#include <ydb/core/protos/kqp_stats.pb.h>
7+
#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>
8+
69
namespace NKqpProto {
710
class TKqpStatsQuery;
811
}
@@ -14,8 +17,21 @@ struct TProgressStatEntry;
1417

1518
namespace NRuCalc {
1619

20+
ui64 CalcReadIORu(const TTableStat& stat);
21+
22+
class TIoReadStat: public TTableStat {
23+
public:
24+
void Add(const NYql::NDqProto::TDqTableStats& tableAggr);
25+
ui64 CalcRu() const;
26+
};
27+
28+
class TIoWriteStat: public TTableStat {
29+
public:
30+
void Add(const NYql::NDqProto::TDqTableStats& tableAggr);
31+
ui64 CalcRu() const;
32+
};
33+
1734
ui64 CpuTimeToUnit(TDuration cpuTimeUs);
18-
ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats);
1935
ui64 CalcRequestUnit(const TProgressStatEntry& stats);
2036

2137
} // namespace NRuCalc

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
317317
ReplayMessage = std::nullopt;
318318
ReplayMessageUserView = std::nullopt;
319319
auto& stats = responseEv->Stats;
320-
stats.SetFromCache(false);
321-
stats.SetDurationUs((TInstant::Now() - StartTime).MicroSeconds());
322-
stats.SetCpuTimeUs(CompileCpuTime.MicroSeconds());
320+
stats.FromCache = false;
321+
stats.DurationUs = (TInstant::Now() - StartTime).MicroSeconds();
322+
stats.CpuTimeUs = CompileCpuTime.MicroSeconds();
323323
Send(Owner, responseEv.Release());
324324

325325
Counters->ReportCompileFinish(DbCounters);

ydb/core/kqp/compile_service/kqp_compile_service.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
938938
}
939939

940940
void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
941-
const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
941+
const TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
942942
NLWTrace::TOrbit orbit, NWilson::TSpan span, const std::optional<TString>& replayMessage = std::nullopt)
943943
{
944944
const auto& query = compileResult->Query;
@@ -953,7 +953,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
953953
<< ", status:" << compileResult->Status);
954954

955955
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit), replayMessage);
956-
responseEv->Stats.CopyFrom(compileStats);
956+
responseEv->Stats = compileStats;
957957

958958
if (span) {
959959
span.End();
@@ -965,8 +965,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
965965
void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
966966
const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
967967
{
968-
NKqpProto::TKqpStatsCompile stats;
969-
stats.SetFromCache(true);
968+
TKqpStatsCompile stats;
969+
stats.FromCache = true;
970970

971971
LWTRACK(KqpCompileServiceReplyFromCache, orbit);
972972
Reply(sender, compileResult, stats, ctx, cookie, std::move(orbit), std::move(span));
@@ -976,7 +976,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
976976
const TIssues& issues, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
977977
{
978978
LWTRACK(KqpCompileServiceReplyError, orbit);
979-
Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, cookie, std::move(orbit), std::move(span));
979+
Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), TKqpStatsCompile{}, ctx, cookie, std::move(orbit), std::move(span));
980980
}
981981

982982
void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message,

ydb/core/kqp/provider/yql_kikimr_provider.h

+35-18
Original file line numberDiff line numberDiff line change
@@ -308,41 +308,52 @@ class TKikimrTransactionContextBase : public TThrRefBase {
308308

309309
bool hasScheme = false;
310310
bool hasData = false;
311-
for (auto& pair : TableOperations) {
312-
hasScheme = hasScheme || (pair.second & KikimrSchemeOps());
313-
hasData = hasData || (pair.second & KikimrDataOps());
311+
for (auto& [_, operation] : TableOperations) {
312+
hasScheme = hasScheme || (operation & KikimrSchemeOps());
313+
hasData = hasData || (operation & KikimrDataOps());
314+
}
315+
316+
THashMap<TStringBuf, const NKqpProto::TKqpTableInfo*> tableInfoMap;
317+
tableInfoMap.reserve(tableInfos.size());
318+
if (TableByIdMap.empty()) {
319+
TableByIdMap.reserve(tableInfos.size());
320+
}
321+
if (TableOperations.empty()) {
322+
TableOperations.reserve(operations.size());
314323
}
315324

316-
THashMap<TString, NKqpProto::TKqpTableInfo> tableInfoMap;
317325
for (const auto& info : tableInfos) {
318-
tableInfoMap.insert(std::make_pair(info.GetTableName(), info));
326+
tableInfoMap.emplace(info.GetTableName(), &info);
319327

320328
TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
321-
TableByIdMap.insert(std::make_pair(pathId, info.GetTableName()));
329+
TableByIdMap.emplace(pathId, info.GetTableName());
322330
}
323331

324332
for (const auto& op : operations) {
325-
auto table = op.GetTable();
333+
const auto& table = [&]() -> const TString& {
334+
const auto tempTable = TempTables.FindPtr(op.GetTable());
335+
if (tempTable) {
336+
return *tempTable;
337+
} else {
338+
return op.GetTable();
339+
}
340+
}();
326341

327-
auto newOp = TYdbOperation(op.GetOperation());
328-
TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
329-
330-
auto tempTable = TempTables.FindPtr(table);
331-
if (tempTable) {
332-
table = *tempTable;
333-
}
342+
const auto newOp = TYdbOperation(op.GetOperation());
334343

335344
const auto info = tableInfoMap.FindPtr(table);
336345
if (!info) {
337346
TString message = TStringBuilder()
338347
<< "Unable to find table info for table '" << table << "'";
348+
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
339349
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message));
340350
return {false, issues};
341351
}
342352

343353
if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) {
344354
TString message = TStringBuilder() << "Operation '" << newOp
345355
<< "' can't be performed in data query";
356+
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
346357
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
347358
return {false, issues};
348359
}
@@ -351,6 +362,7 @@ class TKikimrTransactionContextBase : public TThrRefBase {
351362
if (EffectiveIsolationLevel) {
352363
TString message = TStringBuilder() << "Scheme operations can't be performed inside transaction, "
353364
<< "operation: " << newOp;
365+
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
354366
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
355367
return {false, issues};
356368
}
@@ -359,13 +371,15 @@ class TKikimrTransactionContextBase : public TThrRefBase {
359371
if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) {
360372
TString message = TStringBuilder() << "Operation '" << newOp
361373
<< "' can't be performed in scheme query";
374+
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
362375
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
363376
return {false, issues};
364377
}
365378

366379
if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) {
367380
TString message = TStringBuilder() << "Operation '" << newOp
368381
<< "' can't be performed in scan query";
382+
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
369383
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
370384
return {false, issues};
371385
}
@@ -379,26 +393,28 @@ class TKikimrTransactionContextBase : public TThrRefBase {
379393
message = TStringBuilder() << message
380394
<< " Use COMMIT statement to indicate end of transaction between scheme and data operations.";
381395
}
382-
396+
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
383397
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX, message));
384398
return {false, issues};
385399
}
386400

387401
if (Readonly && (newOp & KikimrModifyOps())) {
388402
TString message = TStringBuilder() << "Operation '" << newOp
389403
<< "' can't be performed in read only transaction";
404+
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
390405
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
391406
return {false, issues};
392407
}
393408

394409
auto& currentOps = TableOperations[table];
395-
bool currentModify = currentOps & KikimrModifyOps();
410+
const bool currentModify = currentOps & KikimrModifyOps();
396411
if (currentModify) {
397412
if (KikimrReadOps() & newOp) {
398413
if (!EnableImmediateEffects) {
399414
TString message = TStringBuilder() << "Data modifications previously made to table '" << table
400415
<< "' in current transaction won't be seen by operation: '"
401416
<< newOp << "'";
417+
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
402418
auto newIssue = AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message));
403419
issues.AddIssue(newIssue);
404420
return {false, issues};
@@ -407,10 +423,11 @@ class TKikimrTransactionContextBase : public TThrRefBase {
407423
HasUncommittedChangesRead = true;
408424
}
409425

410-
if (info->GetHasIndexTables()) {
426+
if ((*info)->GetHasIndexTables()) {
411427
if (!EnableImmediateEffects) {
412428
TString message = TStringBuilder()
413429
<< "Multiple modification of table with secondary indexes is not supported yet";
430+
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
414431
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
415432
return {false, issues};
416433
}
@@ -428,9 +445,9 @@ class TKikimrTransactionContextBase : public TThrRefBase {
428445
virtual ~TKikimrTransactionContextBase() = default;
429446

430447
public:
431-
THashMap<TString, TYdbOperations> TableOperations;
432448
bool HasUncommittedChangesRead = false;
433449
const bool EnableImmediateEffects;
450+
THashMap<TString, TYdbOperations> TableOperations;
434451
THashMap<TKikimrPathId, TString> TableByIdMap;
435452
TMaybe<NKikimrKqp::EIsolationLevel> EffectiveIsolationLevel;
436453
THashMap<TString, TString> TempTables;

ydb/core/kqp/session_actor/kqp_query_state.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
115115
YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
116116
"Unexpected prepared query version: " << compiledVersion);
117117

118-
CompileStats.Swap(&ev->Stats);
118+
CompileStats = ev->Stats;
119119
PreparedQuery = CompileResult->PreparedQuery;
120120
if (ev->ReplayMessage) {
121121
ReplayMessage = *ev->ReplayMessage;

ydb/core/kqp/session_actor/kqp_query_state.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "kqp_query_stats.h"
34
#include "kqp_worker_common.h"
45

56
#include <ydb/library/actors/core/actor_bootstrapped.h>
@@ -85,7 +86,7 @@ class TKqpQueryState : public TNonCopyable {
8586
ui64 ParametersSize = 0;
8687
TPreparedQueryHolder::TConstPtr PreparedQuery;
8788
TKqpCompileResult::TConstPtr CompileResult;
88-
NKqpProto::TKqpStatsCompile CompileStats;
89+
TKqpStatsCompile CompileStats;
8990
TIntrusivePtr<TKqpTransactionContext> TxCtx;
9091
TQueryData::TPtr QueryData;
9192

@@ -97,8 +98,7 @@ class TKqpQueryState : public TNonCopyable {
9798

9899
TInstant StartTime;
99100
NYql::TKikimrQueryDeadlines QueryDeadlines;
100-
101-
NKqpProto::TKqpStatsQuery Stats;
101+
TKqpQueryStats QueryStats;
102102
bool KeepSession = false;
103103
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
104104
NActors::TMonotonic StartedAt;
@@ -235,7 +235,7 @@ class TKqpQueryState : public TNonCopyable {
235235
}
236236

237237
bool NeedCheckTableVersions() const {
238-
return CompileStats.GetFromCache();
238+
return CompileStats.FromCache;
239239
}
240240

241241
TString ExtractQueryText() const {

0 commit comments

Comments
 (0)