Skip to content

Commit b0243eb

Browse files
committed
FIxed issues #1
1 parent 3c5b14c commit b0243eb

File tree

6 files changed

+104
-57
lines changed

6 files changed

+104
-57
lines changed

ydb/tests/tools/kqprun/kqprun.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ void RunMain(int argc, const char* argv[]) {
163163
TString schemeQueryAstFile;
164164
TString scriptQueryAstFile;
165165
TString scriptQueryPlanFile;
166-
TString inProgressStatisticFile;
166+
TString inProgressStatisticsFile;
167167
TString logFile = "-";
168168
TString appConfigFile = "./configuration/app_config.conf";
169169
std::vector<TString> tablesMappingList;
@@ -219,10 +219,10 @@ void RunMain(int argc, const char* argv[]) {
219219
.Optional()
220220
.RequiredArgument("FILE")
221221
.StoreResult(&scriptQueryPlanFile);
222-
options.AddLongOption("in-progress-statistic", "File with script inprogress statitic")
222+
options.AddLongOption("in-progress-statistics", "File with script inprogress statistics")
223223
.Optional()
224224
.RequiredArgument("FILE")
225-
.StoreResult(&inProgressStatisticFile);
225+
.StoreResult(&inProgressStatisticsFile);
226226

227227
options.AddLongOption('C', "clear-execution", "Execute script query without creating additional tables, one of { query | yql-script }")
228228
.Optional()
@@ -324,8 +324,8 @@ void RunMain(int argc, const char* argv[]) {
324324
THolder<TFileOutput> scriptQueryAstFileHolder = SetupDefaultFileOutput(scriptQueryAstFile, runnerOptions.ScriptQueryAstOutput);
325325
THolder<TFileOutput> scriptQueryPlanFileHolder = SetupDefaultFileOutput(scriptQueryPlanFile, runnerOptions.ScriptQueryPlanOutput);
326326

327-
if (inProgressStatisticFile) {
328-
runnerOptions.InProgressStatisticOutputFile = inProgressStatisticFile;
327+
if (inProgressStatisticsFile) {
328+
runnerOptions.InProgressStatisticsOutputFile = inProgressStatisticsFile;
329329
}
330330

331331
runnerOptions.TraceOptType = GetCaseVariant<NKqpRun::TRunnerOptions::ETraceOptType>("trace-opt", traceOptType, {

ydb/tests/tools/kqprun/src/actors.cpp

+63-27
Original file line numberDiff line numberDiff line change
@@ -92,51 +92,87 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
9292
TProgressCallback ProgressCallback_;
9393
};
9494

95-
class TResourceWaiterActor : public NActors::TActorBootstrapped<TResourceWaiterActor> {
95+
class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> {
96+
struct TEvPrivate {
97+
enum EEv : ui32 {
98+
EvResourcesInfo = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
99+
100+
EvEnd
101+
};
102+
103+
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
104+
105+
struct TEvResourcesInfo : public NActors::TEventLocal<TEvResourcesInfo, EvResourcesInfo> {
106+
explicit TEvResourcesInfo(i32 nodeCount)
107+
: NodeCount(nodeCount)
108+
{}
109+
110+
const i32 NodeCount;
111+
};
112+
};
113+
114+
static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10);
115+
96116
public:
97-
TResourceWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount)
117+
TResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount)
98118
: ExpectedNodeCount_(expectedNodeCount)
99119
, Promise_(promise)
100120
{}
101121

102122
void Bootstrap() {
103-
GetResourceManager();
104-
WaitResourcePublish();
123+
Become(&TResourcesWaiterActor::StateFunc);
124+
CheckResourcesPublish();
125+
}
105126

106-
Promise_.SetValue();
107-
PassAway();
127+
void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
128+
CheckResourcesPublish();
108129
}
109130

131+
void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
132+
if (ev->Get()->NodeCount == ExpectedNodeCount_) {
133+
Promise_.SetValue();
134+
PassAway();
135+
return;
136+
}
137+
138+
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
139+
}
140+
141+
STRICT_STFUNC(StateFunc,
142+
hFunc(NActors::TEvents::TEvWakeup, Handle);
143+
hFunc(TEvPrivate::TEvResourcesInfo, Handle);
144+
)
145+
110146
private:
111-
void GetResourceManager() {
112-
while (true) {
113-
ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager(SelfId().NodeId());
114-
if (ResourceManager_) {
115-
break;
116-
}
147+
void CheckResourcesPublish() {
148+
GetResourceManager();
117149

118-
Sleep(TDuration::MilliSeconds(10));
150+
if (!ResourceManager_) {
151+
Schedule(REFRESH_PERIOD, new NActors::TEvents::TEvWakeup());
152+
return;
119153
}
120-
}
121154

122-
void WaitResourcePublish() {
123-
while (true) {
124-
auto resourcesPromise = NThreading::NewPromise<i32>();
125-
ResourceManager_->RequestClusterResourcesInfo([resourcesPromise](TVector<NKikimrKqp::TKqpNodeResources>&& resources) mutable {
126-
resourcesPromise.SetValue(resources.size());
127-
});
155+
UpdateResourcesInfo();
156+
}
128157

129-
if (resourcesPromise.GetFuture().GetValueSync() == ExpectedNodeCount_) {
130-
break;
131-
}
132-
Sleep(TDuration::MilliSeconds(10));
158+
void GetResourceManager() {
159+
if (ResourceManager_) {
160+
return;
133161
}
162+
ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager(SelfId().NodeId());
163+
}
164+
165+
void UpdateResourcesInfo() {
166+
ResourceManager_->RequestClusterResourcesInfo(
167+
[selfId = SelfId(), actorContext = ActorContext()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
168+
actorContext.Send(selfId, new TEvPrivate::TEvResourcesInfo(resources.size()));
169+
});
134170
}
135171

136172
private:
137173
const i32 ExpectedNodeCount_;
138-
139174
NThreading::TPromise<void> Promise_;
175+
140176
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
141177
};
142178

@@ -149,8 +185,8 @@ NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQuer
149185
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, progressCallback);
150186
}
151187

152-
NActors::IActor* CreateResourceWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
153-
return new TResourceWaiterActor(promise, expectedNodeCount);
188+
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
189+
return new TResourcesWaiterActor(promise, expectedNodeCount);
154190
}
155191

156192
} // namespace NKqpRun

ydb/tests/tools/kqprun/src/actors.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQuer
1111
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
1212
TProgressCallback progressCallback);
1313

14-
NActors::IActor* CreateResourceWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);
14+
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);
1515

1616
} // namespace NKqpRun

ydb/tests/tools/kqprun/src/common.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ struct TRunnerOptions {
4747
IOutputStream* SchemeQueryAstOutput = nullptr;
4848
IOutputStream* ScriptQueryAstOutput = nullptr;
4949
IOutputStream* ScriptQueryPlanOutput = nullptr;
50-
TMaybe<TString> InProgressStatisticOutputFile;
50+
TMaybe<TString> InProgressStatisticsOutputFile;
5151

5252
EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson;
5353
NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default;

ydb/tests/tools/kqprun/src/kqp_runner.cpp

+31-20
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,28 @@ namespace NKqpRun {
1515

1616
namespace {
1717

18+
// Function adds thousands separators
19+
// 123456789 -> 123.456.789
1820
TString FormatNumber(i64 number) {
21+
struct TSeparator : public std::numpunct<char> {
22+
char do_thousands_sep() const final {
23+
return '.';
24+
}
25+
26+
std::string do_grouping() const final {
27+
return "\03";
28+
}
29+
};
30+
1931
std::ostringstream stream;
20-
stream.imbue(std::locale("en_US.UTF-8"));
32+
stream.imbue(std::locale(stream.getloc(), new TSeparator()));
2133
stream << number;
2234
return stream.str();
2335
}
2436

25-
void PrintStatistic(const TString& fullStatistic, const THashMap<TString, i64>& flatStatistic, const NFq::TPublicStat& publicStatistic, IOutputStream& output) {
37+
void PrintStatistics(const TString& fullStat, const THashMap<TString, i64>& flatStat, const NFq::TPublicStat& publicStat, IOutputStream& output) {
2638
output << "\nFlat statistic:" << Endl;
27-
for (const auto& [propery, value] : flatStatistic) {
39+
for (const auto& [propery, value] : flatStat) {
2840
TString valueString = ToString(value);
2941
if (propery.find("Bytes") != TString::npos || propery.find("Source") != TString::npos) {
3042
valueString = NKikimr::NBlobDepot::FormatByteSize(value);
@@ -39,31 +51,31 @@ void PrintStatistic(const TString& fullStatistic, const THashMap<TString, i64>&
3951
}
4052

4153
output << "\nPublic statistic:" << Endl;
42-
if (auto memoryUsageBytes = publicStatistic.MemoryUsageBytes) {
54+
if (auto memoryUsageBytes = publicStat.MemoryUsageBytes) {
4355
output << "MemoryUsage = " << NKikimr::NBlobDepot::FormatByteSize(*memoryUsageBytes) << Endl;
4456
}
45-
if (auto cpuUsageUs = publicStatistic.CpuUsageUs) {
57+
if (auto cpuUsageUs = publicStat.CpuUsageUs) {
4658
output << "CpuUsage = " << NFq::FormatDurationUs(*cpuUsageUs) << Endl;
4759
}
48-
if (auto inputBytes = publicStatistic.InputBytes) {
60+
if (auto inputBytes = publicStat.InputBytes) {
4961
output << "InputSize = " << NKikimr::NBlobDepot::FormatByteSize(*inputBytes) << Endl;
5062
}
51-
if (auto outputBytes = publicStatistic.OutputBytes) {
63+
if (auto outputBytes = publicStat.OutputBytes) {
5264
output << "OutputSize = " << NKikimr::NBlobDepot::FormatByteSize(*outputBytes) << Endl;
5365
}
54-
if (auto sourceInputRecords = publicStatistic.SourceInputRecords) {
66+
if (auto sourceInputRecords = publicStat.SourceInputRecords) {
5567
output << "SourceInputRecords = " << FormatNumber(*sourceInputRecords) << Endl;
5668
}
57-
if (auto sinkOutputRecords = publicStatistic.SinkOutputRecords) {
69+
if (auto sinkOutputRecords = publicStat.SinkOutputRecords) {
5870
output << "SinkOutputRecords = " << FormatNumber(*sinkOutputRecords) << Endl;
5971
}
60-
if (auto runningTasks = publicStatistic.RunningTasks) {
72+
if (auto runningTasks = publicStat.RunningTasks) {
6173
output << "RunningTasks = " << FormatNumber(*runningTasks) << Endl;
6274
}
6375

6476
output << "\nFull statistic:" << Endl;
6577
NJson::TJsonValue statsJson;
66-
NJson::ReadJsonTree(fullStatistic, &statsJson);
78+
NJson::ReadJsonTree(fullStat, &statsJson);
6779
NJson::WriteJson(&output, &statsJson, true, true, true);
6880
output << Endl;
6981
}
@@ -282,26 +294,25 @@ class TKqpRunner::TImpl {
282294
}
283295

284296
void PrintScriptProgress(const TString& plan) const {
285-
if (Options_.InProgressStatisticOutputFile) {
286-
TFileOutput outputStream(*Options_.InProgressStatisticOutputFile);
287-
outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistic" << Endl;
297+
if (Options_.InProgressStatisticsOutputFile) {
298+
TFileOutput outputStream(*Options_.InProgressStatisticsOutputFile);
299+
outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistics" << Endl;
288300

289301
auto convertedPlan = plan;
290302
try {
291303
convertedPlan = StatProcessor_->ConvertPlan(plan);
292-
} catch(const NJson::TJsonException& ex) {
304+
} catch (const NJson::TJsonException& ex) {
293305
outputStream << "Error plan conversion: " << ex.what() << Endl;
294306
}
295307

296308
try {
297309
double cpuUsage = 0.0;
298-
auto stat = StatProcessor_->GetQueryStat(convertedPlan, cpuUsage);
299-
outputStream << "\nCPU usage: " << cpuUsage << Endl;
300-
310+
auto fullStat = StatProcessor_->GetQueryStat(convertedPlan, cpuUsage);
301311
auto flatStat = StatProcessor_->GetFlatStat(convertedPlan);
302-
auto publicStat = StatProcessor_->GetPublicStat(stat);
312+
auto publicStat = StatProcessor_->GetPublicStat(fullStat);
303313

304-
PrintStatistic(stat, flatStat, publicStat, outputStream);
314+
outputStream << "\nCPU usage: " << cpuUsage << Endl;
315+
PrintStatistics(fullStat, flatStat, publicStat, outputStream);
305316
} catch(const NJson::TJsonException& ex) {
306317
outputStream << "Error stat conversion: " << ex.what() << Endl;
307318
}

ydb/tests/tools/kqprun/src/ydb_setup.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ class TYdbSetup::TImpl {
163163
NYql::NLog::InitLogger(NActors::CreateNullBackend());
164164
}
165165

166-
void WaitResourcePublishing() const {
166+
void WaitResourcesPublishing() const {
167167
auto promise = NThreading::NewPromise();
168-
GetRuntime()->Register(CreateResourceWaiterActor(promise, Settings_.NodeCount));
168+
GetRuntime()->Register(CreateResourcesWaiterActor(promise, Settings_.NodeCount));
169169

170170
try {
171171
promise.GetFuture().GetValue(Settings_.InitializationTimeout);
@@ -182,7 +182,7 @@ class TYdbSetup::TImpl {
182182
InitializeServer();
183183

184184
if (Settings_.NodeCount > 1) {
185-
WaitResourcePublishing();
185+
WaitResourcesPublishing();
186186
}
187187
}
188188

0 commit comments

Comments
 (0)