Skip to content

Commit 93ed5ab

Browse files
authored
Revert "Session actor perf (#724)" (#782)
This reverts commit a196e8b.
1 parent f6db8b7 commit 93ed5ab

17 files changed

+275
-458
lines changed

ydb/core/kqp/common/compilation/events.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::Ev
9191
}
9292

9393
TKqpCompileResult::TConstPtr CompileResult;
94-
TKqpStatsCompile Stats;
94+
NKqpProto::TKqpStatsCompile Stats;
9595
std::optional<TString> ReplayMessage;
9696
std::optional<TString> ReplayMessageUserView;
9797

ydb/core/kqp/common/compilation/result.h

-6
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,4 @@ struct TKqpCompileResult {
4141

4242
std::shared_ptr<const TPreparedQueryHolder> PreparedQuery;
4343
};
44-
45-
struct TKqpStatsCompile {
46-
bool FromCache = false;
47-
ui64 DurationUs = 0;
48-
ui64 CpuTimeUs = 0;
49-
};
5044
} // namespace NKikimr::NKqp

ydb/core/kqp/common/kqp_ru_calc.cpp

+55-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
#include "kqp_ru_calc.h"
22

3+
#include <ydb/core/protos/kqp_stats.pb.h>
4+
5+
#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>
6+
37
#include <ydb/core/kqp/common/kqp.h>
48

59
#include <util/generic/size_literals.h>
@@ -10,6 +14,8 @@ namespace NKikimr {
1014
namespace NKqp {
1115
namespace NRuCalc {
1216

17+
namespace {
18+
1319
ui64 CalcReadIORu(const TTableStat& stat) {
1420
constexpr ui64 bytesPerUnit = 4_KB;
1521
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;
@@ -18,33 +24,65 @@ ui64 CalcReadIORu(const TTableStat& stat) {
1824
return std::max(bytes, stat.Rows);
1925
}
2026

21-
void TIoReadStat::Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
22-
Rows += tableAggr.GetReadRows();
23-
Bytes += tableAggr.GetReadBytes();
24-
}
27+
class TIoReadStat: public TTableStat {
28+
public:
29+
void Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
30+
Rows += tableAggr.GetReadRows();
31+
Bytes += tableAggr.GetReadBytes();
32+
}
2533

26-
ui64 TIoReadStat::CalcRu() const {
27-
return CalcReadIORu(*this);
28-
}
34+
ui64 CalcRu() const {
35+
return CalcReadIORu(*this);
36+
}
37+
};
2938

30-
void TIoWriteStat::Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
31-
Rows += tableAggr.GetWriteRows();
32-
Rows += tableAggr.GetEraseRows();
33-
Bytes += tableAggr.GetWriteBytes();
34-
}
39+
class TIoWriteStat: public TTableStat {
40+
public:
41+
void Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
42+
Rows += tableAggr.GetWriteRows();
43+
Rows += tableAggr.GetEraseRows();
44+
Bytes += tableAggr.GetWriteBytes();
45+
}
3546

36-
ui64 TIoWriteStat::CalcRu() const {
37-
constexpr ui64 bytesPerUnit = 1_KB;
38-
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;
47+
ui64 CalcRu() const {
48+
constexpr ui64 bytesPerUnit = 1_KB;
49+
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;
50+
51+
auto bytes = (Bytes + bytesPerUnitAdjust) / bytesPerUnit;
52+
return 2 * std::max(bytes, Rows);
53+
}
54+
};
3955

40-
auto bytes = (Bytes + bytesPerUnitAdjust) / bytesPerUnit;
41-
return 2 * std::max(bytes, Rows);
4256
}
4357

4458
ui64 CpuTimeToUnit(TDuration cpuTime) {
4559
return std::floor(cpuTime.MicroSeconds() / 1500.0);
4660
}
4761

62+
ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats) {
63+
TDuration totalCpuTime;
64+
TIoReadStat totalReadStat;
65+
TIoWriteStat totalWriteStat;
66+
67+
for (const auto& exec : stats.GetExecutions()) {
68+
totalCpuTime += TDuration::MicroSeconds(exec.GetCpuTimeUs());
69+
70+
for (auto& table : exec.GetTables()) {
71+
totalReadStat.Add(table);
72+
}
73+
}
74+
75+
if (stats.HasCompilation()) {
76+
totalCpuTime += TDuration::MicroSeconds(stats.GetCompilation().GetCpuTimeUs());
77+
}
78+
79+
totalCpuTime += TDuration::MicroSeconds(stats.GetWorkerCpuTimeUs());
80+
81+
auto totalIoRu = totalReadStat.CalcRu() + totalWriteStat.CalcRu();
82+
83+
return std::max(std::max(CpuTimeToUnit(totalCpuTime), totalIoRu), (ui64)1);
84+
}
85+
4886
ui64 CalcRequestUnit(const TProgressStatEntry& stats) {
4987
auto ioRu = CalcReadIORu(stats.ReadIOStat);
5088

ydb/core/kqp/common/kqp_ru_calc.h

+1-17
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
#include <util/system/types.h>
44
#include <util/datetime/base.h>
55

6-
#include <ydb/core/protos/kqp_stats.pb.h>
7-
#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>
8-
96
namespace NKqpProto {
107
class TKqpStatsQuery;
118
}
@@ -17,21 +14,8 @@ struct TProgressStatEntry;
1714

1815
namespace NRuCalc {
1916

20-
ui64 CalcReadIORu(const TTableStat& stat);
21-
22-
class TIoReadStat: public TTableStat {
23-
public:
24-
void Add(const NYql::NDqProto::TDqTableStats& tableAggr);
25-
ui64 CalcRu() const;
26-
};
27-
28-
class TIoWriteStat: public TTableStat {
29-
public:
30-
void Add(const NYql::NDqProto::TDqTableStats& tableAggr);
31-
ui64 CalcRu() const;
32-
};
33-
3417
ui64 CpuTimeToUnit(TDuration cpuTimeUs);
18+
ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats);
3519
ui64 CalcRequestUnit(const TProgressStatEntry& stats);
3620

3721
} // namespace NRuCalc

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
317317
ReplayMessage = std::nullopt;
318318
ReplayMessageUserView = std::nullopt;
319319
auto& stats = responseEv->Stats;
320-
stats.FromCache = false;
321-
stats.DurationUs = (TInstant::Now() - StartTime).MicroSeconds();
322-
stats.CpuTimeUs = CompileCpuTime.MicroSeconds();
320+
stats.SetFromCache(false);
321+
stats.SetDurationUs((TInstant::Now() - StartTime).MicroSeconds());
322+
stats.SetCpuTimeUs(CompileCpuTime.MicroSeconds());
323323
Send(Owner, responseEv.Release());
324324

325325
Counters->ReportCompileFinish(DbCounters);

ydb/core/kqp/compile_service/kqp_compile_service.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
938938
}
939939

940940
void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
941-
const TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
941+
const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
942942
NLWTrace::TOrbit orbit, NWilson::TSpan span, const std::optional<TString>& replayMessage = std::nullopt)
943943
{
944944
const auto& query = compileResult->Query;
@@ -953,7 +953,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
953953
<< ", status:" << compileResult->Status);
954954

955955
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit), replayMessage);
956-
responseEv->Stats = compileStats;
956+
responseEv->Stats.CopyFrom(compileStats);
957957

958958
if (span) {
959959
span.End();
@@ -965,8 +965,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
965965
void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
966966
const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
967967
{
968-
TKqpStatsCompile stats;
969-
stats.FromCache = true;
968+
NKqpProto::TKqpStatsCompile stats;
969+
stats.SetFromCache(true);
970970

971971
LWTRACK(KqpCompileServiceReplyFromCache, orbit);
972972
Reply(sender, compileResult, stats, ctx, cookie, std::move(orbit), std::move(span));
@@ -976,7 +976,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
976976
const TIssues& issues, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
977977
{
978978
LWTRACK(KqpCompileServiceReplyError, orbit);
979-
Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), TKqpStatsCompile{}, ctx, cookie, std::move(orbit), std::move(span));
979+
Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, cookie, std::move(orbit), std::move(span));
980980
}
981981

982982
void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message,

ydb/core/kqp/provider/yql_kikimr_provider.h

+18-35
Original file line numberDiff line numberDiff line change
@@ -308,52 +308,41 @@ class TKikimrTransactionContextBase : public TThrRefBase {
308308

309309
bool hasScheme = false;
310310
bool hasData = false;
311-
for (auto& [_, operation] : TableOperations) {
312-
hasScheme = hasScheme || (operation & KikimrSchemeOps());
313-
hasData = hasData || (operation & KikimrDataOps());
314-
}
315-
316-
THashMap<TStringBuf, const NKqpProto::TKqpTableInfo*> tableInfoMap;
317-
tableInfoMap.reserve(tableInfos.size());
318-
if (TableByIdMap.empty()) {
319-
TableByIdMap.reserve(tableInfos.size());
320-
}
321-
if (TableOperations.empty()) {
322-
TableOperations.reserve(operations.size());
311+
for (auto& pair : TableOperations) {
312+
hasScheme = hasScheme || (pair.second & KikimrSchemeOps());
313+
hasData = hasData || (pair.second & KikimrDataOps());
323314
}
324315

316+
THashMap<TString, NKqpProto::TKqpTableInfo> tableInfoMap;
325317
for (const auto& info : tableInfos) {
326-
tableInfoMap.emplace(info.GetTableName(), &info);
318+
tableInfoMap.insert(std::make_pair(info.GetTableName(), info));
327319

328320
TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
329-
TableByIdMap.emplace(pathId, info.GetTableName());
321+
TableByIdMap.insert(std::make_pair(pathId, info.GetTableName()));
330322
}
331323

332324
for (const auto& op : operations) {
333-
const auto& table = [&]() -> const TString& {
334-
const auto tempTable = TempTables.FindPtr(op.GetTable());
335-
if (tempTable) {
336-
return *tempTable;
337-
} else {
338-
return op.GetTable();
339-
}
340-
}();
325+
auto table = op.GetTable();
341326

342-
const auto newOp = TYdbOperation(op.GetOperation());
327+
auto newOp = TYdbOperation(op.GetOperation());
328+
TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
329+
330+
auto tempTable = TempTables.FindPtr(table);
331+
if (tempTable) {
332+
table = *tempTable;
333+
}
343334

344335
const auto info = tableInfoMap.FindPtr(table);
345336
if (!info) {
346337
TString message = TStringBuilder()
347338
<< "Unable to find table info for table '" << table << "'";
348-
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
349339
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message));
350340
return {false, issues};
351341
}
352342

353343
if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) {
354344
TString message = TStringBuilder() << "Operation '" << newOp
355345
<< "' can't be performed in data query";
356-
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
357346
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
358347
return {false, issues};
359348
}
@@ -362,7 +351,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
362351
if (EffectiveIsolationLevel) {
363352
TString message = TStringBuilder() << "Scheme operations can't be performed inside transaction, "
364353
<< "operation: " << newOp;
365-
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
366354
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
367355
return {false, issues};
368356
}
@@ -371,15 +359,13 @@ class TKikimrTransactionContextBase : public TThrRefBase {
371359
if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) {
372360
TString message = TStringBuilder() << "Operation '" << newOp
373361
<< "' can't be performed in scheme query";
374-
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
375362
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
376363
return {false, issues};
377364
}
378365

379366
if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) {
380367
TString message = TStringBuilder() << "Operation '" << newOp
381368
<< "' can't be performed in scan query";
382-
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
383369
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
384370
return {false, issues};
385371
}
@@ -393,28 +379,26 @@ class TKikimrTransactionContextBase : public TThrRefBase {
393379
message = TStringBuilder() << message
394380
<< " Use COMMIT statement to indicate end of transaction between scheme and data operations.";
395381
}
396-
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
382+
397383
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX, message));
398384
return {false, issues};
399385
}
400386

401387
if (Readonly && (newOp & KikimrModifyOps())) {
402388
TString message = TStringBuilder() << "Operation '" << newOp
403389
<< "' can't be performed in read only transaction";
404-
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
405390
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
406391
return {false, issues};
407392
}
408393

409394
auto& currentOps = TableOperations[table];
410-
const bool currentModify = currentOps & KikimrModifyOps();
395+
bool currentModify = currentOps & KikimrModifyOps();
411396
if (currentModify) {
412397
if (KikimrReadOps() & newOp) {
413398
if (!EnableImmediateEffects) {
414399
TString message = TStringBuilder() << "Data modifications previously made to table '" << table
415400
<< "' in current transaction won't be seen by operation: '"
416401
<< newOp << "'";
417-
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
418402
auto newIssue = AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message));
419403
issues.AddIssue(newIssue);
420404
return {false, issues};
@@ -423,11 +407,10 @@ class TKikimrTransactionContextBase : public TThrRefBase {
423407
HasUncommittedChangesRead = true;
424408
}
425409

426-
if ((*info)->GetHasIndexTables()) {
410+
if (info->GetHasIndexTables()) {
427411
if (!EnableImmediateEffects) {
428412
TString message = TStringBuilder()
429413
<< "Multiple modification of table with secondary indexes is not supported yet";
430-
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
431414
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
432415
return {false, issues};
433416
}
@@ -445,9 +428,9 @@ class TKikimrTransactionContextBase : public TThrRefBase {
445428
virtual ~TKikimrTransactionContextBase() = default;
446429

447430
public:
431+
THashMap<TString, TYdbOperations> TableOperations;
448432
bool HasUncommittedChangesRead = false;
449433
const bool EnableImmediateEffects;
450-
THashMap<TString, TYdbOperations> TableOperations;
451434
THashMap<TKikimrPathId, TString> TableByIdMap;
452435
TMaybe<NKikimrKqp::EIsolationLevel> EffectiveIsolationLevel;
453436
THashMap<TString, TString> TempTables;

ydb/core/kqp/session_actor/kqp_query_state.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
115115
YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
116116
"Unexpected prepared query version: " << compiledVersion);
117117

118-
CompileStats = ev->Stats;
118+
CompileStats.Swap(&ev->Stats);
119119
PreparedQuery = CompileResult->PreparedQuery;
120120
if (ev->ReplayMessage) {
121121
ReplayMessage = *ev->ReplayMessage;

ydb/core/kqp/session_actor/kqp_query_state.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#pragma once
22

3-
#include "kqp_query_stats.h"
43
#include "kqp_worker_common.h"
54

65
#include <ydb/library/actors/core/actor_bootstrapped.h>
@@ -86,7 +85,7 @@ class TKqpQueryState : public TNonCopyable {
8685
ui64 ParametersSize = 0;
8786
TPreparedQueryHolder::TConstPtr PreparedQuery;
8887
TKqpCompileResult::TConstPtr CompileResult;
89-
TKqpStatsCompile CompileStats;
88+
NKqpProto::TKqpStatsCompile CompileStats;
9089
TIntrusivePtr<TKqpTransactionContext> TxCtx;
9190
TQueryData::TPtr QueryData;
9291

@@ -98,7 +97,8 @@ class TKqpQueryState : public TNonCopyable {
9897

9998
TInstant StartTime;
10099
NYql::TKikimrQueryDeadlines QueryDeadlines;
101-
TKqpQueryStats QueryStats;
100+
101+
NKqpProto::TKqpStatsQuery Stats;
102102
bool KeepSession = false;
103103
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
104104
NActors::TMonotonic StartedAt;
@@ -235,7 +235,7 @@ class TKqpQueryState : public TNonCopyable {
235235
}
236236

237237
bool NeedCheckTableVersions() const {
238-
return CompileStats.FromCache;
238+
return CompileStats.GetFromCache();
239239
}
240240

241241
TString ExtractQueryText() const {

0 commit comments

Comments
 (0)