diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 058bf7cecdb1..545583e0cd09 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -73,6 +73,10 @@ QueryServiceConfig { MinDesiredDirectoriesOfFilesPerQuery: 1000 RegexpCacheSize: 100 + DefaultSettings { + Name: "AtomicUploadCommit" + Value: "true" + } DefaultSettings { Name: "UseBlocksSource" Value: "true" diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index d70de0338bf7..799af7c8985d 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -32,7 +32,7 @@ struct TExecutionOptions { EClearExecutionCase ClearExecution = EClearExecutionCase::Disabled; NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE; - TString TraceId = "kqprun"; + TString TraceId = "kqprun_" + CreateGuidAsString(); bool HasResults() const { return !ScriptQueries.empty() && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE; @@ -163,6 +163,7 @@ void RunMain(int argc, const char* argv[]) { TString schemeQueryAstFile; TString scriptQueryAstFile; TString scriptQueryPlanFile; + TString inProgressStatisticsFile; TString logFile = "-"; TString appConfigFile = "./configuration/app_config.conf"; std::vector tablesMappingList; @@ -218,6 +219,10 @@ void RunMain(int argc, const char* argv[]) { .Optional() .RequiredArgument("FILE") .StoreResult(&scriptQueryPlanFile); + options.AddLongOption("in-progress-statistics", "File with script inprogress statistics") + .Optional() + .RequiredArgument("FILE") + .StoreResult(&inProgressStatisticsFile); options.AddLongOption('C', "clear-execution", "Execute script query without creating additional tables, one of { query | yql-script }") .Optional() @@ -319,6 +324,10 @@ void RunMain(int argc, const char* argv[]) { THolder scriptQueryAstFileHolder = SetupDefaultFileOutput(scriptQueryAstFile, runnerOptions.ScriptQueryAstOutput); THolder scriptQueryPlanFileHolder = SetupDefaultFileOutput(scriptQueryPlanFile, runnerOptions.ScriptQueryPlanOutput); + if (inProgressStatisticsFile) { + runnerOptions.InProgressStatisticsOutputFile = inProgressStatisticsFile; + } + runnerOptions.TraceOptType = GetCaseVariant("trace-opt", traceOptType, { {"all", NKqpRun::TRunnerOptions::ETraceOptType::All}, {"scheme", NKqpRun::TRunnerOptions::ETraceOptType::Scheme}, @@ -399,8 +408,20 @@ void KqprunTerminateHandler() { } +void SegmentationFaultHandler(int) { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cerr << colors.Red() << "======= segmentation fault call stack ========" << colors.Default() << Endl; + FormatBackTrace(&Cerr); + Cerr << colors.Red() << "==============================================" << colors.Default() << Endl; + + abort(); +} + + int main(int argc, const char* argv[]) { std::set_terminate(KqprunTerminateHandler); + signal(SIGSEGV, &SegmentationFaultHandler); try { RunMain(argc, argv); diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index c1597255b8b5..16dd8e0c1e28 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -1,6 +1,7 @@ #include "actors.h" #include +#include namespace NKqpRun { @@ -11,13 +12,14 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped request, NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, TString& queryPlan) + ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, + TProgressCallback progressCallback) : Request_(std::move(request)) , Promise_(promise) , ResultRowsLimit_(std::numeric_limits::max()) , ResultSizeLimit_(std::numeric_limits::max()) , ResultSets_(resultSets) - , QueryPlan_(queryPlan) + , ProgressCallback_(progressCallback) { if (resultRowsLimit) { ResultRowsLimit_ = resultRowsLimit; @@ -76,7 +78,9 @@ class TRunScriptActorMock : public NActors::TActorBootstrappedGet()->Record.GetQueryPlan(); + if (ProgressCallback_) { + ProgressCallback_(ev->Get()->Record); + } } private: @@ -85,15 +89,104 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped& ResultSets_; - TString& QueryPlan_; + TProgressCallback ProgressCallback_; +}; + +class TResourcesWaiterActor : public NActors::TActorBootstrapped { + struct TEvPrivate { + enum EEv : ui32 { + EvResourcesInfo = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + struct TEvResourcesInfo : public NActors::TEventLocal { + explicit TEvResourcesInfo(i32 nodeCount) + : NodeCount(nodeCount) + {} + + const i32 NodeCount; + }; + }; + + static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10); + +public: + TResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount) + : ExpectedNodeCount_(expectedNodeCount) + , Promise_(promise) + {} + + void Bootstrap() { + Become(&TResourcesWaiterActor::StateFunc); + CheckResourcesPublish(); + } + + void Handle(NActors::TEvents::TEvWakeup::TPtr&) { + CheckResourcesPublish(); + } + + void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) { + if (ev->Get()->NodeCount == ExpectedNodeCount_) { + Promise_.SetValue(); + PassAway(); + return; + } + + Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup()); + } + + STRICT_STFUNC(StateFunc, + hFunc(NActors::TEvents::TEvWakeup, Handle); + hFunc(TEvPrivate::TEvResourcesInfo, Handle); + ) + +private: + void CheckResourcesPublish() { + GetResourceManager(); + + if (!ResourceManager_) { + Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup()); + return; + } + + UpdateResourcesInfo(); + } + + void GetResourceManager() { + if (ResourceManager_) { + return; + } + ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager(SelfId().NodeId()); + } + + void UpdateResourcesInfo() const { + ResourceManager_->RequestClusterResourcesInfo( + [selfId = SelfId(), actorContext = ActorContext()](TVector&& resources) { + actorContext.Send(selfId, new TEvPrivate::TEvResourcesInfo(resources.size())); + }); + } + +private: + const i32 ExpectedNodeCount_; + NThreading::TPromise Promise_; + + std::shared_ptr ResourceManager_; }; } // anonymous namespace NActors::IActor* CreateRunScriptActorMock(THolder request, NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, TString& queryPlan) { - return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, queryPlan); + ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, + TProgressCallback progressCallback) { + return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, progressCallback); +} + +NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount) { + return new TResourcesWaiterActor(promise, expectedNodeCount); } } // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/actors.h b/ydb/tests/tools/kqprun/src/actors.h index 6ccbf32eee34..a222f4b3e3b0 100644 --- a/ydb/tests/tools/kqprun/src/actors.h +++ b/ydb/tests/tools/kqprun/src/actors.h @@ -4,8 +4,13 @@ namespace NKqpRun { +using TProgressCallback = std::function; + NActors::IActor* CreateRunScriptActorMock(THolder request, NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, TString& queryPlan); + ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, + TProgressCallback progressCallback); + +NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount); } // namespace NKqpRun diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index 71364d63d7c8..b8c2b597bb6d 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -16,6 +16,7 @@ constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; struct TYdbSetupSettings { i32 NodeCount = 1; TString DomainName = "Root"; + TDuration InitializationTimeout = TDuration::Seconds(10); bool TraceOptEnabled = false; TMaybe LogOutputFile; @@ -46,6 +47,7 @@ struct TRunnerOptions { IOutputStream* SchemeQueryAstOutput = nullptr; IOutputStream* ScriptQueryAstOutput = nullptr; IOutputStream* ScriptQueryPlanOutput = nullptr; + TMaybe InProgressStatisticsOutputFile; EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson; NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index d5751a37273d..4901a512e26e 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -4,12 +4,85 @@ #include #include +#include +#include + #include #include namespace NKqpRun { +namespace { + +// Function adds thousands separators +// 123456789 -> 123.456.789 +TString FormatNumber(i64 number) { + struct TSeparator : public std::numpunct { + char do_thousands_sep() const final { + return '.'; + } + + std::string do_grouping() const final { + return "\03"; + } + }; + + std::ostringstream stream; + stream.imbue(std::locale(stream.getloc(), new TSeparator())); + stream << number; + return stream.str(); +} + +void PrintStatistics(const TString& fullStat, const THashMap& flatStat, const NFq::TPublicStat& publicStat, IOutputStream& output) { + output << "\nFlat statistics:" << Endl; + for (const auto& [propery, value] : flatStat) { + TString valueString = ToString(value); + if (propery.find("Bytes") != TString::npos || propery.find("Source") != TString::npos) { + valueString = NKikimr::NBlobDepot::FormatByteSize(value); + } else if (propery.find("TimeUs") != TString::npos) { + valueString = NFq::FormatDurationUs(value); + } else if (propery.find("TimeMs") != TString::npos) { + valueString = NFq::FormatDurationMs(value); + } else { + valueString = FormatNumber(value); + } + output << propery << " = " << valueString << Endl; + } + + output << "\nPublic statistics:" << Endl; + if (auto memoryUsageBytes = publicStat.MemoryUsageBytes) { + output << "MemoryUsage = " << NKikimr::NBlobDepot::FormatByteSize(*memoryUsageBytes) << Endl; + } + if (auto cpuUsageUs = publicStat.CpuUsageUs) { + output << "CpuUsage = " << NFq::FormatDurationUs(*cpuUsageUs) << Endl; + } + if (auto inputBytes = publicStat.InputBytes) { + output << "InputSize = " << NKikimr::NBlobDepot::FormatByteSize(*inputBytes) << Endl; + } + if (auto outputBytes = publicStat.OutputBytes) { + output << "OutputSize = " << NKikimr::NBlobDepot::FormatByteSize(*outputBytes) << Endl; + } + if (auto sourceInputRecords = publicStat.SourceInputRecords) { + output << "SourceInputRecords = " << FormatNumber(*sourceInputRecords) << Endl; + } + if (auto sinkOutputRecords = publicStat.SinkOutputRecords) { + output << "SinkOutputRecords = " << FormatNumber(*sinkOutputRecords) << Endl; + } + if (auto runningTasks = publicStat.RunningTasks) { + output << "RunningTasks = " << FormatNumber(*runningTasks) << Endl; + } + + output << "\nFull statistics:" << Endl; + NJson::TJsonValue statsJson; + NJson::ReadJsonTree(fullStat, &statsJson); + NJson::WriteJson(&output, &statsJson, true, true, true); + output << Endl; +} + +} // anonymous namespace + + //// TKqpRunner::TImpl class TKqpRunner::TImpl { @@ -22,6 +95,7 @@ class TKqpRunner::TImpl { explicit TImpl(const TRunnerOptions& options) : Options_(options) , YdbSetup_(options.YdbSettings) + , StatProcessor_(NFq::CreateStatProcessor("stat_full")) , CerrColors_(NColorizer::AutoColors(Cerr)) , CoutColors_(NColorizer::AutoColors(Cout)) {} @@ -63,7 +137,7 @@ class TKqpRunner::TImpl { TRequestResult status; switch (queryType) { case EQueryType::ScriptQuery: - status = YdbSetup_.QueryRequest(query, action, traceId, meta, ResultSets_); + status = YdbSetup_.QueryRequest(query, action, traceId, meta, ResultSets_, GetProgressCallback()); break; case EQueryType::YqlScriptQuery: @@ -132,9 +206,16 @@ class TKqpRunner::TImpl { private: bool WaitScriptExecutionOperation() { ExecutionMeta_ = TExecutionMeta(); + + TDuration getOperationPeriod = TDuration::Seconds(1); + if (auto progressStatsPeriodMs = Options_.YdbSettings.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { + getOperationPeriod = TDuration::MilliSeconds(progressStatsPeriodMs); + } + TRequestResult status; while (true) { status = YdbSetup_.GetScriptExecutionOperationRequest(ExecutionOperation_, ExecutionMeta_); + PrintScriptProgress(ExecutionMeta_.Plan); if (ExecutionMeta_.Ready) { break; @@ -145,7 +226,7 @@ class TKqpRunner::TImpl { return false; } - Sleep(TDuration::Seconds(1)); + Sleep(getOperationPeriod); } PrintScriptAst(ExecutionMeta_.Ast); @@ -190,8 +271,8 @@ class TKqpRunner::TImpl { } } - void PrintScriptPlan(const TString& plan) const { - if (!Options_.ScriptQueryPlanOutput || !plan) { + void PrintPlan(const TString& plan, IOutputStream* output) const { + if (!plan) { return; } @@ -201,12 +282,56 @@ class TKqpRunner::TImpl { return; } - Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl; - - NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *Options_.ScriptQueryPlanOutput); + NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *output); printer.Print(plan); } + void PrintScriptPlan(const TString& plan) const { + if (Options_.ScriptQueryPlanOutput) { + Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl; + PrintPlan(plan, Options_.ScriptQueryPlanOutput); + } + } + + void PrintScriptProgress(const TString& plan) const { + if (Options_.InProgressStatisticsOutputFile) { + TFileOutput outputStream(*Options_.InProgressStatisticsOutputFile); + outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistics" << Endl; + + auto convertedPlan = plan; + try { + convertedPlan = StatProcessor_->ConvertPlan(plan); + } catch (const NJson::TJsonException& ex) { + outputStream << "Error plan conversion: " << ex.what() << Endl; + } + + try { + double cpuUsage = 0.0; + auto fullStat = StatProcessor_->GetQueryStat(convertedPlan, cpuUsage); + auto flatStat = StatProcessor_->GetFlatStat(convertedPlan); + auto publicStat = StatProcessor_->GetPublicStat(fullStat); + + outputStream << "\nCPU usage: " << cpuUsage << Endl; + PrintStatistics(fullStat, flatStat, publicStat, outputStream); + } catch (const NJson::TJsonException& ex) { + outputStream << "Error stat conversion: " << ex.what() << Endl; + } + + outputStream << "\nPlan visualization:" << Endl; + PrintPlan(convertedPlan, &outputStream); + + outputStream.Finish(); + } + } + + TProgressCallback GetProgressCallback() { + return [this](const NKikimrKqp::TEvExecuterProgress& executerProgress) mutable { + const TString& plan = executerProgress.GetQueryPlan(); + ExecutionMeta_.Plan = plan; + PrintScriptProgress(plan); + }; + } + void PrintScriptResult(const Ydb::ResultSet& resultSet) const { switch (Options_.ResultOutputFormat) { case TRunnerOptions::EResultOutputFormat::RowsJson: { @@ -241,6 +366,7 @@ class TKqpRunner::TImpl { TRunnerOptions Options_; TYdbSetup YdbSetup_; + std::unique_ptr StatProcessor_; NColorizer::TColors CerrColors_; NColorizer::TColors CoutColors_; diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index 90b303f93d27..d76c7c9b3150 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -1,4 +1,3 @@ -#include "actors.h" #include "ydb_setup.h" #include @@ -164,12 +163,27 @@ class TYdbSetup::TImpl { NYql::NLog::InitLogger(NActors::CreateNullBackend()); } + void WaitResourcesPublishing() const { + auto promise = NThreading::NewPromise(); + GetRuntime()->Register(CreateResourcesWaiterActor(promise, Settings_.NodeCount)); + + try { + promise.GetFuture().GetValue(Settings_.InitializationTimeout); + } catch (...) { + ythrow yexception() << "Failed to initialize all resources: " << CurrentExceptionMessage(); + } + } + public: explicit TImpl(const TYdbSetupSettings& settings) : Settings_(settings) { InitializeYqlLogger(); InitializeServer(); + + if (Settings_.NodeCount > 1) { + WaitResourcesPublishing(); + } } NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr SchemeQueryRequest(const TString& query, const TString& traceId) const { @@ -186,18 +200,18 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } - NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, std::vector& resultSets, TString& queryPlan) const { + NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, std::vector& resultSets, TProgressCallback progressCallback) const { auto event = MakeHolder(); FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record); if (auto progressStatsPeriodMs = Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { - event->SetProgressStatsPeriod(TDuration::MilliSeconds(Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs())); + event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs)); } auto promise = NThreading::NewPromise(); auto rowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(); auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); - GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, resultSets, queryPlan)); + GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, resultSets, progressCallback)); return promise.GetFuture().GetValueSync(); } @@ -344,10 +358,10 @@ TRequestResult TYdbSetup::ScriptRequest(const TString& script, NKikimrKqp::EQuer return TRequestResult(scriptExecutionOperation->Get()->Status, scriptExecutionOperation->Get()->Issues); } -TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const { +TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const { resultSets.clear(); - auto queryOperationResponse = Impl_->QueryRequest(query, action, traceId, resultSets, meta.Plan)->Get()->Record.GetRef(); + auto queryOperationResponse = Impl_->QueryRequest(query, action, traceId, resultSets, progressCallback)->Get()->Record.GetRef(); const auto& responseRecord = queryOperationResponse.GetResponse(); meta.Ast = responseRecord.GetQueryAst(); diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h index f31a3b249891..745cdae8a659 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.h +++ b/ydb/tests/tools/kqprun/src/ydb_setup.h @@ -1,6 +1,7 @@ #pragma once #include "common.h" +#include "actors.h" #include #include @@ -54,7 +55,7 @@ class TYdbSetup { TRequestResult ScriptRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, TString& operation) const; - TRequestResult QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const; + TRequestResult QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const; TRequestResult YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const;