Skip to content

Session actor perf #724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::Ev
}

TKqpCompileResult::TConstPtr CompileResult;
NKqpProto::TKqpStatsCompile Stats;
TKqpStatsCompile Stats;
std::optional<TString> ReplayMessage;
std::optional<TString> ReplayMessageUserView;

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/common/compilation/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,10 @@ struct TKqpCompileResult {

std::shared_ptr<const TPreparedQueryHolder> PreparedQuery;
};

struct TKqpStatsCompile {
bool FromCache = false;
ui64 DurationUs = 0;
ui64 CpuTimeUs = 0;
};
} // namespace NKikimr::NKqp
72 changes: 17 additions & 55 deletions ydb/core/kqp/common/kqp_ru_calc.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#include "kqp_ru_calc.h"

#include <ydb/core/protos/kqp_stats.pb.h>

#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>

#include <ydb/core/kqp/common/kqp.h>

#include <util/generic/size_literals.h>
Expand All @@ -14,8 +10,6 @@ namespace NKikimr {
namespace NKqp {
namespace NRuCalc {

namespace {

ui64 CalcReadIORu(const TTableStat& stat) {
constexpr ui64 bytesPerUnit = 4_KB;
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;
Expand All @@ -24,65 +18,33 @@ ui64 CalcReadIORu(const TTableStat& stat) {
return std::max(bytes, stat.Rows);
}

class TIoReadStat: public TTableStat {
public:
void Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
Rows += tableAggr.GetReadRows();
Bytes += tableAggr.GetReadBytes();
}

ui64 CalcRu() const {
return CalcReadIORu(*this);
}
};
void TIoReadStat::Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
Rows += tableAggr.GetReadRows();
Bytes += tableAggr.GetReadBytes();
}

class TIoWriteStat: public TTableStat {
public:
void Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
Rows += tableAggr.GetWriteRows();
Rows += tableAggr.GetEraseRows();
Bytes += tableAggr.GetWriteBytes();
}
ui64 TIoReadStat::CalcRu() const {
return CalcReadIORu(*this);
}

ui64 CalcRu() const {
constexpr ui64 bytesPerUnit = 1_KB;
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;
void TIoWriteStat::Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
Rows += tableAggr.GetWriteRows();
Rows += tableAggr.GetEraseRows();
Bytes += tableAggr.GetWriteBytes();
}

auto bytes = (Bytes + bytesPerUnitAdjust) / bytesPerUnit;
return 2 * std::max(bytes, Rows);
}
};
ui64 TIoWriteStat::CalcRu() const {
constexpr ui64 bytesPerUnit = 1_KB;
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;

auto bytes = (Bytes + bytesPerUnitAdjust) / bytesPerUnit;
return 2 * std::max(bytes, Rows);
}

ui64 CpuTimeToUnit(TDuration cpuTime) {
return std::floor(cpuTime.MicroSeconds() / 1500.0);
}

ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats) {
TDuration totalCpuTime;
TIoReadStat totalReadStat;
TIoWriteStat totalWriteStat;

for (const auto& exec : stats.GetExecutions()) {
totalCpuTime += TDuration::MicroSeconds(exec.GetCpuTimeUs());

for (auto& table : exec.GetTables()) {
totalReadStat.Add(table);
}
}

if (stats.HasCompilation()) {
totalCpuTime += TDuration::MicroSeconds(stats.GetCompilation().GetCpuTimeUs());
}

totalCpuTime += TDuration::MicroSeconds(stats.GetWorkerCpuTimeUs());

auto totalIoRu = totalReadStat.CalcRu() + totalWriteStat.CalcRu();

return std::max(std::max(CpuTimeToUnit(totalCpuTime), totalIoRu), (ui64)1);
}

ui64 CalcRequestUnit(const TProgressStatEntry& stats) {
auto ioRu = CalcReadIORu(stats.ReadIOStat);

Expand Down
18 changes: 17 additions & 1 deletion ydb/core/kqp/common/kqp_ru_calc.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#include <util/system/types.h>
#include <util/datetime/base.h>

#include <ydb/core/protos/kqp_stats.pb.h>
#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>

namespace NKqpProto {
class TKqpStatsQuery;
}
Expand All @@ -14,8 +17,21 @@ struct TProgressStatEntry;

namespace NRuCalc {

ui64 CalcReadIORu(const TTableStat& stat);

class TIoReadStat: public TTableStat {
public:
void Add(const NYql::NDqProto::TDqTableStats& tableAggr);
ui64 CalcRu() const;
};

class TIoWriteStat: public TTableStat {
public:
void Add(const NYql::NDqProto::TDqTableStats& tableAggr);
ui64 CalcRu() const;
};

ui64 CpuTimeToUnit(TDuration cpuTimeUs);
ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats);
ui64 CalcRequestUnit(const TProgressStatEntry& stats);

} // namespace NRuCalc
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
ReplayMessage = std::nullopt;
ReplayMessageUserView = std::nullopt;
auto& stats = responseEv->Stats;
stats.SetFromCache(false);
stats.SetDurationUs((TInstant::Now() - StartTime).MicroSeconds());
stats.SetCpuTimeUs(CompileCpuTime.MicroSeconds());
stats.FromCache = false;
stats.DurationUs = (TInstant::Now() - StartTime).MicroSeconds();
stats.CpuTimeUs = CompileCpuTime.MicroSeconds();
Send(Owner, responseEv.Release());

Counters->ReportCompileFinish(DbCounters);
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
const TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
NLWTrace::TOrbit orbit, NWilson::TSpan span, const std::optional<TString>& replayMessage = std::nullopt)
{
const auto& query = compileResult->Query;
Expand All @@ -953,7 +953,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
<< ", status:" << compileResult->Status);

auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit), replayMessage);
responseEv->Stats.CopyFrom(compileStats);
responseEv->Stats = compileStats;

if (span) {
span.End();
Expand All @@ -965,8 +965,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
NKqpProto::TKqpStatsCompile stats;
stats.SetFromCache(true);
TKqpStatsCompile stats;
stats.FromCache = true;

LWTRACK(KqpCompileServiceReplyFromCache, orbit);
Reply(sender, compileResult, stats, ctx, cookie, std::move(orbit), std::move(span));
Expand All @@ -976,7 +976,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
const TIssues& issues, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
LWTRACK(KqpCompileServiceReplyError, orbit);
Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, cookie, std::move(orbit), std::move(span));
Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), TKqpStatsCompile{}, ctx, cookie, std::move(orbit), std::move(span));
}

void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message,
Expand Down
53 changes: 35 additions & 18 deletions ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,41 +308,52 @@ class TKikimrTransactionContextBase : public TThrRefBase {

bool hasScheme = false;
bool hasData = false;
for (auto& pair : TableOperations) {
hasScheme = hasScheme || (pair.second & KikimrSchemeOps());
hasData = hasData || (pair.second & KikimrDataOps());
for (auto& [_, operation] : TableOperations) {
hasScheme = hasScheme || (operation & KikimrSchemeOps());
hasData = hasData || (operation & KikimrDataOps());
}

THashMap<TStringBuf, const NKqpProto::TKqpTableInfo*> tableInfoMap;
tableInfoMap.reserve(tableInfos.size());
if (TableByIdMap.empty()) {
TableByIdMap.reserve(tableInfos.size());
}
if (TableOperations.empty()) {
TableOperations.reserve(operations.size());
}

THashMap<TString, NKqpProto::TKqpTableInfo> tableInfoMap;
for (const auto& info : tableInfos) {
tableInfoMap.insert(std::make_pair(info.GetTableName(), info));
tableInfoMap.emplace(info.GetTableName(), &info);

TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
TableByIdMap.insert(std::make_pair(pathId, info.GetTableName()));
TableByIdMap.emplace(pathId, info.GetTableName());
}

for (const auto& op : operations) {
auto table = op.GetTable();
const auto& table = [&]() -> const TString& {
const auto tempTable = TempTables.FindPtr(op.GetTable());
if (tempTable) {
return *tempTable;
} else {
return op.GetTable();
}
}();

auto newOp = TYdbOperation(op.GetOperation());
TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());

auto tempTable = TempTables.FindPtr(table);
if (tempTable) {
table = *tempTable;
}
const auto newOp = TYdbOperation(op.GetOperation());

const auto info = tableInfoMap.FindPtr(table);
if (!info) {
TString message = TStringBuilder()
<< "Unable to find table info for table '" << table << "'";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message));
return {false, issues};
}

if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in data query";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}
Expand All @@ -351,6 +362,7 @@ class TKikimrTransactionContextBase : public TThrRefBase {
if (EffectiveIsolationLevel) {
TString message = TStringBuilder() << "Scheme operations can't be performed inside transaction, "
<< "operation: " << newOp;
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}
Expand All @@ -359,13 +371,15 @@ class TKikimrTransactionContextBase : public TThrRefBase {
if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in scheme query";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}

if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in scan query";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}
Expand All @@ -379,26 +393,28 @@ class TKikimrTransactionContextBase : public TThrRefBase {
message = TStringBuilder() << message
<< " Use COMMIT statement to indicate end of transaction between scheme and data operations.";
}

const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX, message));
return {false, issues};
}

if (Readonly && (newOp & KikimrModifyOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in read only transaction";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}

auto& currentOps = TableOperations[table];
bool currentModify = currentOps & KikimrModifyOps();
const bool currentModify = currentOps & KikimrModifyOps();
if (currentModify) {
if (KikimrReadOps() & newOp) {
if (!EnableImmediateEffects) {
TString message = TStringBuilder() << "Data modifications previously made to table '" << table
<< "' in current transaction won't be seen by operation: '"
<< newOp << "'";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
auto newIssue = AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message));
issues.AddIssue(newIssue);
return {false, issues};
Expand All @@ -407,10 +423,11 @@ class TKikimrTransactionContextBase : public TThrRefBase {
HasUncommittedChangesRead = true;
}

if (info->GetHasIndexTables()) {
if ((*info)->GetHasIndexTables()) {
if (!EnableImmediateEffects) {
TString message = TStringBuilder()
<< "Multiple modification of table with secondary indexes is not supported yet";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}
Expand All @@ -428,9 +445,9 @@ class TKikimrTransactionContextBase : public TThrRefBase {
virtual ~TKikimrTransactionContextBase() = default;

public:
THashMap<TString, TYdbOperations> TableOperations;
bool HasUncommittedChangesRead = false;
const bool EnableImmediateEffects;
THashMap<TString, TYdbOperations> TableOperations;
THashMap<TKikimrPathId, TString> TableByIdMap;
TMaybe<NKikimrKqp::EIsolationLevel> EffectiveIsolationLevel;
THashMap<TString, TString> TempTables;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
"Unexpected prepared query version: " << compiledVersion);

CompileStats.Swap(&ev->Stats);
CompileStats = ev->Stats;
PreparedQuery = CompileResult->PreparedQuery;
if (ev->ReplayMessage) {
ReplayMessage = *ev->ReplayMessage;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "kqp_query_stats.h"
#include "kqp_worker_common.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand Down Expand Up @@ -85,7 +86,7 @@ class TKqpQueryState : public TNonCopyable {
ui64 ParametersSize = 0;
TPreparedQueryHolder::TConstPtr PreparedQuery;
TKqpCompileResult::TConstPtr CompileResult;
NKqpProto::TKqpStatsCompile CompileStats;
TKqpStatsCompile CompileStats;
TIntrusivePtr<TKqpTransactionContext> TxCtx;
TQueryData::TPtr QueryData;

Expand All @@ -97,8 +98,7 @@ class TKqpQueryState : public TNonCopyable {

TInstant StartTime;
NYql::TKikimrQueryDeadlines QueryDeadlines;

NKqpProto::TKqpStatsQuery Stats;
TKqpQueryStats QueryStats;
bool KeepSession = false;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
NActors::TMonotonic StartedAt;
Expand Down Expand Up @@ -235,7 +235,7 @@ class TKqpQueryState : public TNonCopyable {
}

bool NeedCheckTableVersions() const {
return CompileStats.GetFromCache();
return CompileStats.FromCache;
}

TString ExtractQueryText() const {
Expand Down
Loading