Skip to content

Commit 6a1a387

Browse files
authored
Support progress stats for workload (#13725)
1 parent 2fdfe89 commit 6a1a387

File tree

15 files changed

+247
-38
lines changed

15 files changed

+247
-38
lines changed

ydb/apps/ydb/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Added statistics output on the current progress of the query in `ydb workload` command
12
* Fixed a bug where arm64 YDB CLI binary was downloading amd64 binary to replace itself during `ydb update`. To update already installed binaries to the latest arm64 version, YDB CLI should be re-installed
23
* Fixed a bug where `ydb workload tpch import generator` and `ydb workload tpcds import generator` commands were failing due to not all tables were created
34
* Fixed a bug with backslashes in `ydb workload` benchmark paths on Windows

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
99
#include <ydb/core/grpc_services/rpc_kqp_base.h>
1010
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
11+
#include <ydb/core/kqp/opt/kqp_query_plan.h>
1112
#include <ydb/library/ydb_issue/issue_helpers.h>
1213
#include <ydb/public/api/protos/ydb_query.pb.h>
1314

@@ -214,6 +215,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
214215
HFunc(TEvents::TEvWakeup, Handle);
215216
HFunc(TRpcServices::TEvGrpcNextReply, Handle);
216217
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
218+
hFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
217219
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
218220
hFunc(NKikimr::NGRpcService::TEvSubscribeGrpcCancel, Handle);
219221
default:
@@ -281,6 +283,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
281283
settings,
282284
req->pool_id());
283285

286+
ev->SetProgressStatsPeriod(TDuration::MilliSeconds(req->stats_period_ms()));
287+
284288
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId())) {
285289
NYql::TIssues issues;
286290
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
@@ -353,6 +357,27 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
353357
channel.SendAck(SelfId());
354358
}
355359

360+
void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
361+
auto& record = ev->Get()->Record;
362+
363+
Ydb::Query::ExecuteQueryResponsePart response;
364+
response.set_status(Ydb::StatusIds::SUCCESS);
365+
366+
if (NeedReportStats(*Request_->GetProtoRequest())) {
367+
if (record.HasQueryStats()) {
368+
FillQueryStats(*response.mutable_exec_stats(), record.GetQueryStats());
369+
response.mutable_exec_stats()->set_query_plan(NKqp::SerializeAnalyzePlan(record.GetQueryStats()));
370+
}
371+
}
372+
373+
TString out;
374+
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
375+
376+
FlowControl_.PushResponse(out.size());
377+
378+
Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);
379+
}
380+
356381
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
357382
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);
358383

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,6 +1144,10 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
11441144
ExportAggStats(a.Rows, *aggrStat.MutableRows());
11451145
}
11461146
}
1147+
1148+
for (const auto& [_, tableStats] : TableStats) {
1149+
stats.AddTables()->CopyFrom(*tableStats);
1150+
}
11471151
}
11481152

11491153
void TQueryExecutionStats::AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& stats) {

ydb/public/api/protos/ydb_query.proto

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,13 @@ message ExecuteQueryRequest {
174174
// Allows to set size limitation (in bytes) for one result part
175175
int64 response_part_limit_bytes = 9 [(Ydb.value) = "[0; 33554432]"];
176176

177-
string pool_id = 10; // Workload manager pool id
177+
// Workload manager pool id
178+
string pool_id = 10;
179+
180+
// Time interval for sending periodical query statistics.
181+
// When query statistics are enabled (stats_mode != STATS_MODE_NONE), by default statistics will be sent only once after query execution is finished.
182+
// In case when stats_period_ms is specified and is non-zero, query statistics will be additionally sent every stats_period_ms milliseconds beginning from the start of query execution.
183+
int64 stats_period_ms = 11 [(Ydb.value) = ">= 0"];
178184
}
179185

180186
message ResultSetMeta {

ydb/public/lib/ydb_cli/commands/benchmark_utils.cpp

Lines changed: 79 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
#include <ydb-cpp-sdk/client/table/table.h>
1515
#include <ydb/public/lib/ydb_cli/common/pretty_table.h>
1616
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
17+
#include <ydb/public/lib/ydb_cli/common/formats.h>
18+
#include <ydb/public/lib/ydb_cli/common/format.h>
19+
#include <ydb/public/lib/ydb_cli/common/plan2svg.h>
20+
#include <ydb/public/lib/ydb_cli/common/progress_indication.h>
21+
#include <ydb-cpp-sdk/client/proto/accessor.h>
1722

1823
#include <ydb/public/api/protos/ydb_query.pb.h>
1924
#include <yql/essentials/public/decimal/yql_decimal.h>
@@ -179,9 +184,21 @@ class TQueryResultScanner {
179184
}
180185

181186
template <typename TIterator>
182-
bool Scan(TIterator& it) {
187+
bool Scan(TIterator& it, std::optional<TString> planFileName = std::nullopt) {
188+
189+
TProgressIndication progressIndication(true);
190+
TMaybe<NQuery::TExecStats> execStats;
191+
192+
TString currentPlanFileNameStats;
193+
TString currentPlanWithStatsFileName;
194+
TString currentPlanWithStatsFileNameJson;
195+
if (planFileName) {
196+
currentPlanFileNameStats = TStringBuilder() << *planFileName << ".stats";
197+
currentPlanWithStatsFileName = TStringBuilder() << *planFileName << ".svg";
198+
currentPlanWithStatsFileNameJson = TStringBuilder() << *planFileName << ".json";
199+
}
183200
for (;;) {
184-
auto streamPart = it.ReadNext().GetValueSync();
201+
auto streamPart = it.ReadNext().ExtractValueSync();
185202
ui64 rsIndex = 0;
186203

187204
if constexpr (std::is_same_v<TIterator, NTable::TScanQueryPartIterator>) {
@@ -191,13 +208,47 @@ class TQueryResultScanner {
191208
PlanAst = streamPart.GetQueryStats().GetAst().value_or("");
192209
}
193210
} else {
194-
const auto& stats = streamPart.GetStats();
195-
rsIndex = streamPart.GetResultSetIndex();
196-
if (stats) {
197-
ServerTiming += stats->GetTotalDuration();
198-
QueryPlan = stats->GetPlan().value_or("");
199-
PlanAst = stats->GetAst().value_or("");
211+
if (streamPart.HasStats()) {
212+
execStats = streamPart.ExtractStats();
213+
214+
if (planFileName) {
215+
TFileOutput out(currentPlanFileNameStats);
216+
out << execStats->ToString();
217+
{
218+
auto plan = execStats->GetPlan();
219+
if (plan) {
220+
{
221+
TPlanVisualizer pv;
222+
TFileOutput out(currentPlanWithStatsFileName);
223+
try {
224+
pv.LoadPlans(*execStats->GetPlan());
225+
out << pv.PrintSvg();
226+
} catch (std::exception& e) {
227+
out << "<svg width='1024' height='256' xmlns='http://www.w3.org/2000/svg'><text>" << e.what() << "<text></svg>";
228+
}
229+
}
230+
{
231+
TFileOutput out(currentPlanWithStatsFileNameJson);
232+
TQueryPlanPrinter queryPlanPrinter(EDataFormat::JsonBase64, true, out, 120);
233+
queryPlanPrinter.Print(*execStats->GetPlan());
234+
}
235+
}
236+
}
237+
}
238+
239+
const auto& protoStats = TProtoAccessor::GetProto(execStats.GetRef());
240+
for (const auto& queryPhase : protoStats.query_phases()) {
241+
for (const auto& tableAccessStats : queryPhase.table_access()) {
242+
progressIndication.UpdateProgress({tableAccessStats.reads().rows(), tableAccessStats.reads().bytes(),
243+
tableAccessStats.updates().rows(), tableAccessStats.updates().bytes(),
244+
tableAccessStats.deletes().rows(), tableAccessStats.deletes().bytes()});
245+
}
246+
}
247+
248+
progressIndication.Render();
200249
}
250+
251+
rsIndex = streamPart.GetResultSetIndex();
201252
}
202253

203254
if (!streamPart.IsSuccess()) {
@@ -212,6 +263,11 @@ class TQueryResultScanner {
212263
RawResults[rsIndex].emplace_back(streamPart.ExtractResultSet());
213264
}
214265
}
266+
if (execStats) {
267+
ServerTiming += execStats->GetTotalDuration();
268+
QueryPlan = execStats->GetPlan().value_or("");
269+
PlanAst = execStats->GetAst().value_or("");
270+
}
215271
return true;
216272
}
217273
};
@@ -255,32 +311,35 @@ TQueryBenchmarkResult ExecuteImpl(const TString& query, NTable::TTableClient& cl
255311
}
256312
}
257313

258-
TQueryBenchmarkResult Execute(const TString& query, NTable::TTableClient& client, const TQueryBenchmarkDeadline& deadline) {
259-
return ExecuteImpl(query, client, deadline, false);
314+
TQueryBenchmarkResult Execute(const TString& query, NTable::TTableClient& client, const TQueryBenchmarkSettings& settings) {
315+
return ExecuteImpl(query, client, settings.Deadline, false);
260316
}
261317

262318
TQueryBenchmarkResult Explain(const TString& query, NTable::TTableClient& client, const TQueryBenchmarkDeadline& deadline) {
263319
return ExecuteImpl(query, client, deadline, true);
264320
}
265321

266-
TQueryBenchmarkResult ExecuteImpl(const TString& query, NQuery::TQueryClient& client, const TQueryBenchmarkDeadline& deadline, bool explainOnly) {
322+
TQueryBenchmarkResult ExecuteImpl(const TString& query, NQuery::TQueryClient& client, const TQueryBenchmarkSettings& benchmarkSettings, bool explainOnly) {
267323
NQuery::TExecuteQuerySettings settings;
268324
settings.StatsMode(NQuery::EStatsMode::Full);
269325
settings.ExecMode(explainOnly ? NQuery::EExecMode::Explain : NQuery::EExecMode::Execute);
270-
if (auto error = SetTimeoutSettings(settings, deadline)) {
326+
if (benchmarkSettings.WithProgress) {
327+
settings.StatsCollectPeriod(std::chrono::milliseconds(3000));
328+
}
329+
if (auto error = SetTimeoutSettings(settings, benchmarkSettings.Deadline)) {
271330
return *error;
272331
}
273332
auto it = client.StreamExecuteQuery(
274333
query,
275334
NYdb::NQuery::TTxControl::BeginTx().CommitTx(),
276335
settings).GetValueSync();
277-
if (auto error = ResultByStatus(it, deadline.Name)) {
336+
if (auto error = ResultByStatus(it, benchmarkSettings.Deadline.Name)) {
278337
return *error;
279338
}
280339

281340
TQueryResultScanner composite;
282-
composite.SetDeadlineName(deadline.Name);
283-
if (!composite.Scan(it)) {
341+
composite.SetDeadlineName(benchmarkSettings.Deadline.Name);
342+
if (!composite.Scan(it, benchmarkSettings.PlanFileName)) {
284343
return TQueryBenchmarkResult::Error(
285344
composite.GetErrorInfo(), composite.GetQueryPlan(), composite.GetPlanAst());
286345
} else {
@@ -293,12 +352,14 @@ TQueryBenchmarkResult ExecuteImpl(const TString& query, NQuery::TQueryClient& cl
293352
}
294353
}
295354

296-
TQueryBenchmarkResult Execute(const TString& query, NQuery::TQueryClient& client, const TQueryBenchmarkDeadline& deadline) {
297-
return ExecuteImpl(query, client, deadline, false);
355+
TQueryBenchmarkResult Execute(const TString& query, NQuery::TQueryClient& client, const TQueryBenchmarkSettings& settings) {
356+
return ExecuteImpl(query, client, settings, false);
298357
}
299358

300359
TQueryBenchmarkResult Explain(const TString& query, NQuery::TQueryClient& client, const TQueryBenchmarkDeadline& deadline) {
301-
return ExecuteImpl(query, client, deadline, true);
360+
TQueryBenchmarkSettings settings;
361+
settings.Deadline = deadline;
362+
return ExecuteImpl(query, client, settings, true);
302363
}
303364

304365
NJson::TJsonValue GetQueryLabels(ui32 queryId) {

ydb/public/lib/ydb_cli/commands/benchmark_utils.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,16 @@ struct TQueryBenchmarkDeadline {
7878
TString Name;
7979
};
8080

81+
struct TQueryBenchmarkSettings {
82+
TQueryBenchmarkDeadline Deadline;
83+
std::optional<TString> PlanFileName;
84+
bool WithProgress = false;
85+
};
86+
8187
TString FullTablePath(const TString& database, const TString& table);
8288
bool HasCharsInString(const TString& str);
83-
TQueryBenchmarkResult Execute(const TString & query, NTable::TTableClient & client, const TQueryBenchmarkDeadline& deadline);
84-
TQueryBenchmarkResult Execute(const TString & query, NQuery::TQueryClient & client, const TQueryBenchmarkDeadline& deadline);
89+
TQueryBenchmarkResult Execute(const TString & query, NTable::TTableClient & client, const TQueryBenchmarkSettings& settings);
90+
TQueryBenchmarkResult Execute(const TString & query, NQuery::TQueryClient & client, const TQueryBenchmarkSettings& settings);
8591
TQueryBenchmarkResult Explain(const TString & query, NTable::TTableClient & client, const TQueryBenchmarkDeadline& deadline);
8692
TQueryBenchmarkResult Explain(const TString & query, NQuery::TQueryClient & client, const TQueryBenchmarkDeadline& deadline);
8793
NJson::TJsonValue GetQueryLabels(ui32 queryId);

ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <library/cpp/json/json_writer.h>
77
#include <util/string/printf.h>
88
#include <util/folder/path.h>
9+
#include <optional>
910

1011
namespace NYdb::NConsoleClient {
1112
TWorkloadCommandBenchmark::TWorkloadCommandBenchmark(NYdbWorkload::TWorkloadParams& params, const NYdbWorkload::IWorkloadQueryGenerator::TWorkloadType& workload)
@@ -358,9 +359,18 @@ bool TWorkloadCommandBenchmark::RunBench(TClient* client, NYdbWorkload::IWorkloa
358359
for (ui32 i = 0; i < IterationsCount && Now() < GlobalDeadline; ++i) {
359360
auto t1 = TInstant::Now();
360361
TQueryBenchmarkResult res = TQueryBenchmarkResult::Error("undefined", "undefined", "undefined");
362+
363+
TQueryBenchmarkSettings settings;
364+
settings.Deadline = GetDeadline();
365+
settings.WithProgress = true;
366+
367+
if (PlanFileName) {
368+
settings.PlanFileName = TStringBuilder() << PlanFileName << "." << queryN << "." << ToString(i);
369+
}
370+
361371
try {
362372
if (client) {
363-
res = Execute(query, *client, GetDeadline());
373+
res = Execute(query, *client, settings);
364374
} else {
365375
res = TQueryBenchmarkResult::Result(TQueryBenchmarkResult::TRawResults(), TDuration::Zero(), "", "");
366376
}

ydb/public/lib/ydb_cli/common/print_utils.cpp

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <google/protobuf/port_def.inc>
55

66
#include <util/string/printf.h>
7+
#include <util/stream/format.h>
78

89
namespace NYdb {
910
namespace NConsoleClient {
@@ -51,23 +52,14 @@ void PrintSchemeEntry(IOutputStream& o, const NScheme::TSchemeEntry& entry, NCol
5152
o << entry.Name << colors.OldColor();
5253
}
5354

54-
TString PrettySize(size_t size) {
55+
TString PrettySize(ui64 size) {
5556
double sizeFormat = size;
56-
TString mod = "b";
57-
const char* mods[] = { "Kb", "Mb", "Gb", "Tb", "Pb", "Eb" };
58-
TString numFormat = "%.0f";
59-
60-
for (const char* nextMod : mods) {
61-
if (sizeFormat > 1024) {
62-
sizeFormat /= 1024;
63-
mod = nextMod;
64-
numFormat = "%.02f";
65-
} else {
66-
break;
67-
}
68-
}
57+
return ToString(HumanReadableSize(sizeFormat, ESizeFormat::SF_QUANTITY)) + " B";
58+
}
6959

70-
return Sprintf((numFormat + " %s").data(), sizeFormat, mod.data());
60+
TString PrettyNumber(ui64 number) {
61+
double numberFormat = number;
62+
return ToString(HumanReadableSize(numberFormat, ESizeFormat::SF_QUANTITY));
7163
}
7264

7365
TString FormatTime(TInstant time) {

ydb/public/lib/ydb_cli/common/print_utils.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ namespace NConsoleClient {
99
void PrintSchemeEntry(IOutputStream& o, const NScheme::TSchemeEntry& entry, NColorizer::TColors colors);
1010
TString FormatTime(TInstant time);
1111
TString FormatDuration(TDuration duration);
12-
TString PrettySize(size_t size);
12+
TString PrettySize(ui64 size);
13+
TString PrettyNumber(ui64 number);
1314
TString EntryTypeToString(NScheme::ESchemeEntryType entry);
1415

1516
int PrintProtoJsonBase64(const google::protobuf::Message& msg);

0 commit comments

Comments
 (0)