Skip to content

Commit d7e9d66

Browse files
authored
Merge e981908 into 4ba2617
2 parents 4ba2617 + e981908 commit d7e9d66

File tree

7 files changed

+229
-21
lines changed

7 files changed

+229
-21
lines changed

ydb/tests/tools/kqprun/kqprun.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ void RunMain(int argc, const char* argv[]) {
163163
TString schemeQueryAstFile;
164164
TString scriptQueryAstFile;
165165
TString scriptQueryPlanFile;
166+
TString inProgressStatisticFile;
166167
TString logFile = "-";
167168
TString appConfigFile = "./configuration/app_config.conf";
168169
std::vector<TString> tablesMappingList;
@@ -218,6 +219,10 @@ void RunMain(int argc, const char* argv[]) {
218219
.Optional()
219220
.RequiredArgument("FILE")
220221
.StoreResult(&scriptQueryPlanFile);
222+
options.AddLongOption("in-progress-statistic", "File with script inprogress statitic")
223+
.Optional()
224+
.RequiredArgument("FILE")
225+
.StoreResult(&inProgressStatisticFile);
221226

222227
options.AddLongOption('C', "clear-execution", "Execute script query without creating additional tables, one of { query | yql-script }")
223228
.Optional()
@@ -319,6 +324,10 @@ void RunMain(int argc, const char* argv[]) {
319324
THolder<TFileOutput> scriptQueryAstFileHolder = SetupDefaultFileOutput(scriptQueryAstFile, runnerOptions.ScriptQueryAstOutput);
320325
THolder<TFileOutput> scriptQueryPlanFileHolder = SetupDefaultFileOutput(scriptQueryPlanFile, runnerOptions.ScriptQueryPlanOutput);
321326

327+
if (inProgressStatisticFile) {
328+
runnerOptions.InProgressStatisticOutputFile = inProgressStatisticFile;
329+
}
330+
322331
runnerOptions.TraceOptType = GetCaseVariant<NKqpRun::TRunnerOptions::ETraceOptType>("trace-opt", traceOptType, {
323332
{"all", NKqpRun::TRunnerOptions::ETraceOptType::All},
324333
{"scheme", NKqpRun::TRunnerOptions::ETraceOptType::Scheme},

ydb/tests/tools/kqprun/src/actors.cpp

+63-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "actors.h"
22

33
#include <ydb/core/kqp/common/simple/services.h>
4+
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
45

56

67
namespace NKqpRun {
@@ -11,13 +12,14 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
1112
public:
1213
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
1314
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
14-
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan)
15+
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
16+
TProgressCallback progressCallback)
1517
: Request_(std::move(request))
1618
, Promise_(promise)
1719
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
1820
, ResultSizeLimit_(std::numeric_limits<i64>::max())
1921
, ResultSets_(resultSets)
20-
, QueryPlan_(queryPlan)
22+
, ProgressCallback_(progressCallback)
2123
{
2224
if (resultRowsLimit) {
2325
ResultRowsLimit_ = resultRowsLimit;
@@ -76,7 +78,9 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
7678
}
7779

7880
void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
79-
QueryPlan_ = ev->Get()->Record.GetQueryPlan();
81+
if (ProgressCallback_) {
82+
ProgressCallback_(ev->Get()->Record);
83+
}
8084
}
8185

8286
private:
@@ -85,15 +89,68 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
8589
ui64 ResultRowsLimit_;
8690
ui64 ResultSizeLimit_;
8791
std::vector<Ydb::ResultSet>& ResultSets_;
88-
TString& QueryPlan_;
92+
TProgressCallback ProgressCallback_;
93+
};
94+
95+
class TResourceWaiterActor : public NActors::TActorBootstrapped<TResourceWaiterActor> {
96+
public:
97+
TResourceWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount)
98+
: ExpectedNodeCount_(expectedNodeCount)
99+
, Promise_(promise)
100+
{}
101+
102+
void Bootstrap() {
103+
GetResourceManager();
104+
WaitResourcePublish();
105+
106+
Promise_.SetValue();
107+
PassAway();
108+
}
109+
110+
private:
111+
void GetResourceManager() {
112+
while (true) {
113+
ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager(SelfId().NodeId());
114+
if (ResourceManager_) {
115+
break;
116+
}
117+
118+
Sleep(TDuration::MilliSeconds(10));
119+
}
120+
}
121+
122+
void WaitResourcePublish() {
123+
while (true) {
124+
auto resourcesPromise = NThreading::NewPromise<i32>();
125+
ResourceManager_->RequestClusterResourcesInfo([resourcesPromise](TVector<NKikimrKqp::TKqpNodeResources>&& resources) mutable {
126+
resourcesPromise.SetValue(resources.size());
127+
});
128+
129+
if (resourcesPromise.GetFuture().GetValueSync() == ExpectedNodeCount_) {
130+
break;
131+
}
132+
Sleep(TDuration::MilliSeconds(10));
133+
}
134+
}
135+
136+
private:
137+
const i32 ExpectedNodeCount_;
138+
139+
NThreading::TPromise<void> Promise_;
140+
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
89141
};
90142

91143
} // anonymous namespace
92144

93145
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
94146
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
95-
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan) {
96-
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, queryPlan);
147+
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
148+
TProgressCallback progressCallback) {
149+
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, progressCallback);
150+
}
151+
152+
NActors::IActor* CreateResourceWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
153+
return new TResourceWaiterActor(promise, expectedNodeCount);
97154
}
98155

99156
} // namespace NKqpRun

ydb/tests/tools/kqprun/src/actors.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@
44

55
namespace NKqpRun {
66

7+
using TProgressCallback = std::function<void(const NKikimrKqp::TEvExecuterProgress&)>;
8+
79
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
810
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
9-
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan);
11+
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
12+
TProgressCallback progressCallback);
13+
14+
NActors::IActor* CreateResourceWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);
1015

1116
} // namespace NKqpRun

ydb/tests/tools/kqprun/src/common.h

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";
1616
struct TYdbSetupSettings {
1717
i32 NodeCount = 1;
1818
TString DomainName = "Root";
19+
TDuration InitializationTimeout = TDuration::Seconds(10);
1920

2021
bool TraceOptEnabled = false;
2122
TMaybe<TString> LogOutputFile;
@@ -46,6 +47,7 @@ struct TRunnerOptions {
4647
IOutputStream* SchemeQueryAstOutput = nullptr;
4748
IOutputStream* ScriptQueryAstOutput = nullptr;
4849
IOutputStream* ScriptQueryPlanOutput = nullptr;
50+
TMaybe<TString> InProgressStatisticOutputFile;
4951

5052
EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson;
5153
NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default;

ydb/tests/tools/kqprun/src/kqp_runner.cpp

+127-7
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,78 @@
44
#include <library/cpp/colorizer/colors.h>
55
#include <library/cpp/json/json_reader.h>
66

7+
#include <ydb/core/blob_depot/mon_main.h>
8+
#include <ydb/core/fq/libs/compute/common/utils.h>
9+
710
#include <ydb/public/lib/json_value/ydb_json_value.h>
811
#include <ydb/public/lib/ydb_cli/common/format.h>
912

1013

1114
namespace NKqpRun {
1215

16+
namespace {
17+
18+
TString FormatNumber(i64 number) {
19+
if (number < 0) {
20+
return TStringBuilder() << "-" << FormatNumber(-number);
21+
}
22+
23+
if (number < 1000) {
24+
return ToString(number);
25+
}
26+
27+
return TStringBuilder() << FormatNumber(number / 1000) << "'" << (number % 1000) / 100 << (number % 100) / 10 << number % 10;
28+
}
29+
30+
void PrintStatistic(const TString& fullStatistic, const THashMap<TString, i64>& flatStatistic, const NFq::TPublicStat& publicStatistic, IOutputStream& output) {
31+
output << "\nFlat statistic:" << Endl;
32+
for (const auto& [propery, value] : flatStatistic) {
33+
TString valueString = ToString(value);
34+
if (propery.find("Bytes") != TString::npos || propery.find("Source") != TString::npos) {
35+
valueString = NKikimr::NBlobDepot::FormatByteSize(value);
36+
} else if (propery.find("TimeUs") != TString::npos) {
37+
valueString = NFq::FormatDurationUs(value);
38+
} else if (propery.find("TimeMs") != TString::npos) {
39+
valueString = NFq::FormatDurationMs(value);
40+
} else {
41+
valueString = FormatNumber(value);
42+
}
43+
output << propery << " = " << valueString << Endl;
44+
}
45+
46+
output << "\nPublic statistic:" << Endl;
47+
if (auto memoryUsageBytes = publicStatistic.MemoryUsageBytes) {
48+
output << "MemoryUsage = " << NKikimr::NBlobDepot::FormatByteSize(*memoryUsageBytes) << Endl;
49+
}
50+
if (auto cpuUsageUs = publicStatistic.CpuUsageUs) {
51+
output << "CpuUsage = " << NFq::FormatDurationUs(*cpuUsageUs) << Endl;
52+
}
53+
if (auto inputBytes = publicStatistic.InputBytes) {
54+
output << "InputSize = " << NKikimr::NBlobDepot::FormatByteSize(*inputBytes) << Endl;
55+
}
56+
if (auto outputBytes = publicStatistic.OutputBytes) {
57+
output << "OutputSize = " << NKikimr::NBlobDepot::FormatByteSize(*outputBytes) << Endl;
58+
}
59+
if (auto sourceInputRecords = publicStatistic.SourceInputRecords) {
60+
output << "SourceInputRecords = " << FormatNumber(*sourceInputRecords) << Endl;
61+
}
62+
if (auto sinkOutputRecords = publicStatistic.SinkOutputRecords) {
63+
output << "SinkOutputRecords = " << FormatNumber(*sinkOutputRecords) << Endl;
64+
}
65+
if (auto runningTasks = publicStatistic.RunningTasks) {
66+
output << "RunningTasks = " << FormatNumber(*runningTasks) << Endl;
67+
}
68+
69+
output << "\nFull statistic:" << Endl;
70+
NJson::TJsonValue statsJson;
71+
NJson::ReadJsonTree(fullStatistic, &statsJson);
72+
NJson::WriteJson(&output, &statsJson, true, true, true);
73+
output << Endl;
74+
}
75+
76+
} // anonymous namespace
77+
78+
1379
//// TKqpRunner::TImpl
1480

1581
class TKqpRunner::TImpl {
@@ -22,6 +88,7 @@ class TKqpRunner::TImpl {
2288
explicit TImpl(const TRunnerOptions& options)
2389
: Options_(options)
2490
, YdbSetup_(options.YdbSettings)
91+
, StatProcessor_(NFq::CreateStatProcessor("stat_full"))
2592
, CerrColors_(NColorizer::AutoColors(Cerr))
2693
, CoutColors_(NColorizer::AutoColors(Cout))
2794
{}
@@ -63,7 +130,7 @@ class TKqpRunner::TImpl {
63130
TRequestResult status;
64131
switch (queryType) {
65132
case EQueryType::ScriptQuery:
66-
status = YdbSetup_.QueryRequest(query, action, traceId, meta, ResultSets_);
133+
status = YdbSetup_.QueryRequest(query, action, traceId, meta, ResultSets_, GetProgressCallback());
67134
break;
68135

69136
case EQueryType::YqlScriptQuery:
@@ -132,9 +199,16 @@ class TKqpRunner::TImpl {
132199
private:
133200
bool WaitScriptExecutionOperation() {
134201
ExecutionMeta_ = TExecutionMeta();
202+
203+
TDuration getOperationPeriod = TDuration::Seconds(1);
204+
if (auto progressStatsPeriodMs = Options_.YdbSettings.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) {
205+
getOperationPeriod = TDuration::MilliSeconds(progressStatsPeriodMs);
206+
}
207+
135208
TRequestResult status;
136209
while (true) {
137210
status = YdbSetup_.GetScriptExecutionOperationRequest(ExecutionOperation_, ExecutionMeta_);
211+
PrintScriptProgress(ExecutionMeta_.Plan);
138212

139213
if (ExecutionMeta_.Ready) {
140214
break;
@@ -145,7 +219,7 @@ class TKqpRunner::TImpl {
145219
return false;
146220
}
147221

148-
Sleep(TDuration::Seconds(1));
222+
Sleep(getOperationPeriod);
149223
}
150224

151225
PrintScriptAst(ExecutionMeta_.Ast);
@@ -190,8 +264,8 @@ class TKqpRunner::TImpl {
190264
}
191265
}
192266

193-
void PrintScriptPlan(const TString& plan) const {
194-
if (!Options_.ScriptQueryPlanOutput || !plan) {
267+
void PrintPlan(const TString& plan, IOutputStream* output) const {
268+
if (!plan) {
195269
return;
196270
}
197271

@@ -201,12 +275,57 @@ class TKqpRunner::TImpl {
201275
return;
202276
}
203277

204-
Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl;
205-
206-
NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *Options_.ScriptQueryPlanOutput);
278+
NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *output);
207279
printer.Print(plan);
208280
}
209281

282+
void PrintScriptPlan(const TString& plan) const {
283+
if (Options_.ScriptQueryPlanOutput) {
284+
Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl;
285+
PrintPlan(plan, Options_.ScriptQueryPlanOutput);
286+
}
287+
}
288+
289+
void PrintScriptProgress(const TString& plan) const {
290+
if (Options_.InProgressStatisticOutputFile) {
291+
TFileOutput outputStream(*Options_.InProgressStatisticOutputFile);
292+
outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistic" << Endl;
293+
294+
auto convertedPlan = plan;
295+
try {
296+
convertedPlan = StatProcessor_->ConvertPlan(plan);
297+
} catch(const NJson::TJsonException& ex) {
298+
outputStream << "Error plan conversion: " << ex.what() << Endl;
299+
}
300+
301+
try {
302+
double cpuUsage = 0.0;
303+
auto stat = StatProcessor_->GetQueryStat(convertedPlan, cpuUsage);
304+
outputStream << "\nCPU usage: " << cpuUsage << Endl;
305+
306+
auto flatStat = StatProcessor_->GetFlatStat(convertedPlan);
307+
auto publicStat = StatProcessor_->GetPublicStat(stat);
308+
309+
PrintStatistic(stat, flatStat, publicStat, outputStream);
310+
} catch(const NJson::TJsonException& ex) {
311+
outputStream << "Error stat conversion: " << ex.what() << Endl;
312+
}
313+
314+
outputStream << "\nPlan visualization:" << Endl;
315+
PrintPlan(convertedPlan, &outputStream);
316+
317+
outputStream.Finish();
318+
}
319+
}
320+
321+
TProgressCallback GetProgressCallback() {
322+
return [this](const NKikimrKqp::TEvExecuterProgress& executerProgress) mutable {
323+
const TString& plan = executerProgress.GetQueryPlan();
324+
ExecutionMeta_.Plan = plan;
325+
PrintScriptProgress(plan);
326+
};
327+
}
328+
210329
void PrintScriptResult(const Ydb::ResultSet& resultSet) const {
211330
switch (Options_.ResultOutputFormat) {
212331
case TRunnerOptions::EResultOutputFormat::RowsJson: {
@@ -241,6 +360,7 @@ class TKqpRunner::TImpl {
241360
TRunnerOptions Options_;
242361

243362
TYdbSetup YdbSetup_;
363+
std::unique_ptr<NFq::IPlanStatProcessor> StatProcessor_;
244364
NColorizer::TColors CerrColors_;
245365
NColorizer::TColors CoutColors_;
246366

0 commit comments

Comments
 (0)