Skip to content

Commit 3facfc0

Browse files
authored
YQL-17250: Add operation information into hybrid statistics (#597)
* YQL-17250: Add operation information into hybrid statistics * Fix review comments
1 parent b97c74b commit 3facfc0

File tree

6 files changed

+67
-24
lines changed

6 files changed

+67
-24
lines changed

ydb/library/yql/providers/common/provider/yql_provider.cpp

+7-4
Original file line numberDiff line numberDiff line change
@@ -1264,14 +1264,16 @@ void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& sta
12641264
writer.OnEndMap();
12651265
}
12661266

1267-
void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics) {
1267+
void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addExternalMap) {
12681268
if (statistics.empty()) {
12691269
return;
12701270
}
12711271

12721272
THashMap<TString, std::tuple<i64, i64, i64, TMaybe<i64>>> total; // sum, count, max, min
12731273

1274-
writer.OnBeginMap();
1274+
if (addExternalMap) {
1275+
writer.OnBeginMap();
1276+
}
12751277

12761278
for (const auto& opStatistics : statistics) {
12771279
for (auto& el : opStatistics.second.Entries) {
@@ -1331,8 +1333,9 @@ void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<
13311333
writer.OnEndMap();
13321334
}
13331335
writer.OnEndMap(); // total
1334-
1335-
writer.OnEndMap();
1336+
if (addExternalMap) {
1337+
writer.OnEndMap();
1338+
}
13361339
}
13371340

13381341
bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx) {

ydb/library/yql/providers/common/provider/yql_provider.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ void WriteStreams(NYson::TYsonWriter& writer, TStringBuf name, const NNodes::TCo
179179

180180
double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx);
181181

182-
void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics);
182+
void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addExternalMap = true);
183183
void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& statistics);
184184

185185
bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx);

ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp

+10-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,16 @@ class TYtDataSink : public TDataProviderBase {
160160
return false;
161161
}
162162

163-
NCommon::WriteStatistics(writer, totalOnly, State_->Statistics);
163+
writer.OnBeginMap();
164+
NCommon::WriteStatistics(writer, totalOnly, State_->Statistics, false);
165+
writer.OnKeyedItem("Hybrid");
166+
writer.OnBeginMap();
167+
for (const auto& [opName, stats] : State_->HybridStatistics) {
168+
writer.OnKeyedItem(opName);
169+
NCommon::WriteStatistics(writer, totalOnly, {{0, stats}});
170+
}
171+
writer.OnEndMap();
172+
writer.OnEndMap();
164173

165174
return true;
166175
}

ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp

+22-8
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,23 @@ bool NeedFallback(const TIssues& issues) {
5757

5858
TIssue WrapIssuesOnHybridFallback(TPosition pos, const TIssues& issues) {
5959
TIssue result(pos, "Hybrid execution fallback on YT");
60-
result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING);
60+
result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO);
6161

62-
const std::function<void(TIssue& issue)> toWarning = [&](TIssue& issue) {
63-
if (issue.Severity == TSeverityIds::S_ERROR || issue.Severity == TSeverityIds::S_FATAL) {
64-
issue.Severity = TSeverityIds::S_WARNING;
62+
const std::function<void(TIssue& issue)> toInfo = [&](TIssue& issue) {
63+
if (issue.Severity == TSeverityIds::S_ERROR
64+
|| issue.Severity == TSeverityIds::S_FATAL
65+
|| issue.Severity == TSeverityIds::S_WARNING) {
66+
issue.Severity = TSeverityIds::S_INFO;
6567
}
6668
for (const auto& subissue : issue.GetSubIssues()) {
67-
toWarning(*subissue);
69+
toInfo(*subissue);
6870
}
6971
};
7072

7173
for (const auto& issue : issues) {
72-
TIssuePtr warning(new TIssue(issue));
73-
toWarning(*warning);
74-
result.AddSubIssue(std::move(warning));
74+
TIssuePtr info(new TIssue(issue));
75+
toInfo(*info);
76+
result.AddSubIssue(std::move(info));
7577
}
7678

7779
return result;
@@ -125,6 +127,11 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
125127
static TExprNode::TPtr FinalizeOutputOp(const TYtState::TPtr& state, const TString& operationHash,
126128
const IYtGateway::TRunResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, bool markFinished)
127129
{
130+
if (markFinished && !TYtDqProcessWrite::Match(input.Get())) {
131+
with_lock(state->StatisticsMutex) {
132+
state->HybridStatistics[input->Content()].Entries.emplace_back(TString{"YtExecution"}, 0, 0, 0, 0, 1);
133+
}
134+
}
128135
auto outSection = TYtOutputOpBase(input).Output();
129136
YQL_ENSURE(outSection.Size() == res.OutTableStats.size(), "Invalid output table count in IYtGateway::TRunResult");
130137
TExprNode::TListType newOutTables;
@@ -286,14 +293,21 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
286293
State_->Statistics[Max<ui32>()].Entries.emplace_back(TString{name}, 0, 0, 0, 0, 1);
287294
}
288295
};
296+
auto hybridStatWriter = [this](TStringBuf statName, TStringBuf opName) {
297+
with_lock(State_->StatisticsMutex) {
298+
State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1);
299+
}
300+
};
289301

290302
switch (input->Head().GetState()) {
291303
case TExprNode::EState::ExecutionComplete:
292304
statWriter("HybridExecution");
305+
hybridStatWriter("Execution", input->TailPtr()->Content());
293306
output = input->HeadPtr();
294307
break;
295308
case TExprNode::EState::Error: {
296309
statWriter("HybridFallback");
310+
hybridStatWriter("Fallback", input->TailPtr()->Content());
297311
if (State_->Configuration->HybridDqExecutionFallback.Get().GetOrElse(true)) {
298312
output = input->TailPtr();
299313
} else {

ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp

+26-10
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,12 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
275275
if (const auto stat = CanReadHybrid(sort.Input().Item(0))) {
276276
if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) {
277277
YQL_CLOG(INFO, ProviderYt) << "Sort on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
278-
PushStat("Hybrid_Sort_try");
278+
PushStat("HybridTry");
279+
PushHybridStat("Try", node.Raw()->Content());
279280
return MakeYtSortByDq(sort, ctx);
280281
}
281-
PushStat("Hybrid_Sort_over_limits");
282+
PushStat("HybridSkipOverLimits");
283+
PushHybridStat("SkipOverLimits", node.Raw()->Content());
282284
}
283285
}
284286
}
@@ -294,10 +296,12 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
294296
if (const auto stat = CanReadHybrid(merge.Input().Item(0))) {
295297
if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) {
296298
YQL_CLOG(INFO, ProviderYt) << "Merge on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
297-
PushStat("Hybrid_Merge_try");
299+
PushStat("HybridTry");
300+
PushHybridStat("Try", node.Raw()->Content());
298301
return MakeYtSortByDq(merge, ctx);
299302
}
300-
PushStat("Hybrid_Merge_over_limits");
303+
PushStat("HybridSkipOverLimits");
304+
PushHybridStat("SkipOverLimits", node.Raw()->Content());
301305
}
302306
}
303307
}
@@ -347,7 +351,8 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
347351
if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) {
348352
if (CanExecuteInHybrid(map.Mapper().Ptr(), chunksLimit, sizeLimit)) {
349353
YQL_CLOG(INFO, ProviderYt) << "Map on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
350-
PushStat("Hybrid_Map_try");
354+
PushStat("HybridTry");
355+
PushHybridStat("Try", node.Raw()->Content());
351356
TSyncMap syncList;
352357
const auto& paths = map.Input().Item(0).Paths();
353358
for (auto i = 0U; i < paths.Size(); ++i) {
@@ -430,7 +435,8 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
430435
.Done();
431436
}
432437
}
433-
PushStat("Hybrid_Map_over_limits");
438+
PushStat("HybridOverLimits");
439+
PushHybridStat("SkipOverLimits", node.Raw()->Content());
434440
}
435441
}
436442

@@ -626,12 +632,14 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
626632
if (CanExecuteInHybrid(reduce.Reducer().Ptr(), chunksLimit, sizeLimit)) {
627633
if (ETypeAnnotationKind::Struct == GetSeqItemType(*reduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) {
628634
YQL_CLOG(INFO, ProviderYt) << "Reduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
629-
PushStat("Hybrid_Reduce_try");
635+
PushStat("HybridTry");
636+
PushHybridStat("Try", node.Raw()->Content());
630637
return MakeYtReduceByDq(reduce, ctx);
631638
}
632639
}
633640
}
634-
PushStat("Hybrid_Reduce_over_limits");
641+
PushStat("HybridSkipOverLimits");
642+
PushHybridStat("SkipOverLimits", node.Raw()->Content());
635643
}
636644
}
637645

@@ -647,12 +655,14 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
647655
if (CanExecuteInHybrid(mapReduce.Reducer().Ptr(), chunksLimit, sizeLimit) && CanExecuteInHybrid(mapReduce.Mapper().Ptr(), chunksLimit, sizeLimit)) {
648656
if (ETypeAnnotationKind::Struct == GetSeqItemType(*mapReduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) {
649657
YQL_CLOG(INFO, ProviderYt) << "MapReduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
650-
PushStat("Hybrid_MapReduce_try");
658+
PushHybridStat("Try", node.Raw()->Content());
659+
PushStat("HybridTry");
651660
return MakeYtReduceByDq(mapReduce, ctx);
652661
}
653662
}
654663
}
655-
PushStat("Hybrid_MapReduce_over_limits");
664+
PushHybridStat("SkipOverLimits", node.Raw()->Content());
665+
PushStat("HybridOverLimits");
656666
}
657667
}
658668

@@ -665,6 +675,12 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
665675
}
666676
};
667677

678+
void PushHybridStat(TStringBuf statName, TStringBuf opName) const {
679+
with_lock(State_->StatisticsMutex) {
680+
State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1);
681+
}
682+
};
683+
668684
const TYtState::TPtr State_;
669685
const THolder<IGraphTransformer> Finalizer_;
670686
};

ydb/library/yql/providers/yt/provider/yql_yt_provider.h

+1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ struct TYtState : public TThrRefBase {
9696
THashMap<std::pair<TString, TString>, TString> AnonymousLabels; // cluster + label -> name
9797
std::unordered_map<ui64, TString> NodeHash; // unique id -> hash
9898
THashMap<ui32, TOperationStatistics> Statistics; // public id -> stat
99+
THashMap<TString, TOperationStatistics> HybridStatistics; // operation name -> stat
99100
TMutex StatisticsMutex;
100101
THashSet<std::pair<TString, TString>> Checkpoints; // Set of checkpoint tables
101102
THolder<IDqIntegration> DqIntegration_;

0 commit comments

Comments
 (0)