Skip to content

Commit 0c16248

Browse files
authored
YQ-3738 RD pass UV from filter to read actor (#11940)
1 parent 137077a commit 0c16248

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+4892
-2369
lines changed

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
99
#include <ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h>
1010

11+
#include <yql/essentials/public/issue/yql_issue.h>
1112
#include <yql/essentials/public/purecalc/common/fwd.h>
1213

1314
namespace NFq {
@@ -148,9 +149,9 @@ struct TEvRowDispatcher {
148149
};
149150

150151
struct TEvSessionStatistic : public NActors::TEventLocal<TEvSessionStatistic, EEv::EvSessionStatistic> {
151-
TEvSessionStatistic(const TopicSessionStatistic& stat)
152+
TEvSessionStatistic(const TTopicSessionStatistic& stat)
152153
: Stat(stat) {}
153-
TopicSessionStatistic Stat;
154+
TTopicSessionStatistic Stat;
154155
};
155156

156157
struct TEvHeartbeat : public NActors::TEventPB<TEvHeartbeat, NFq::NRowDispatcherProto::TEvHeartbeat, EEv::EvHeartbeat> {
@@ -182,16 +183,19 @@ struct TEvRowDispatcher {
182183
};
183184

184185
struct TEvPurecalcCompileResponse : public NActors::TEventLocal<TEvPurecalcCompileResponse, EEv::EvPurecalcCompileResponse> {
185-
explicit TEvPurecalcCompileResponse(const TString& error)
186-
: Error(error)
186+
TEvPurecalcCompileResponse(NYql::NDqProto::StatusIds::StatusCode status, NYql::TIssues issues)
187+
: Status(status)
188+
, Issues(std::move(issues))
187189
{}
188190

189191
explicit TEvPurecalcCompileResponse(IProgramHolder::TPtr programHolder)
190192
: ProgramHolder(std::move(programHolder))
193+
, Status(NYql::NDqProto::StatusIds::SUCCESS)
191194
{}
192195

193196
IProgramHolder::TPtr ProgramHolder; // Same holder that passed into TEvPurecalcCompileRequest
194-
TString Error;
197+
NYql::NDqProto::StatusIds::StatusCode Status;
198+
NYql::TIssues Issues;
195199
};
196200
};
197201

ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
namespace NFq {
77

8-
struct TopicSessionClientStatistic {
8+
struct TTopicSessionClientStatistic {
99
NActors::TActorId ReadActorId;
1010
ui32 PartitionId = 0;
1111
i64 UnreadRows = 0; // Current value
@@ -15,7 +15,7 @@ struct TopicSessionClientStatistic {
1515
bool IsWaiting = false; // Current value
1616
i64 ReadLagMessages = 0; // Current value
1717
ui64 InitialOffset = 0;
18-
void Add(const TopicSessionClientStatistic& stat) {
18+
void Add(const TTopicSessionClientStatistic& stat) {
1919
UnreadRows = stat.UnreadRows;
2020
UnreadBytes = stat.UnreadBytes;
2121
Offset = stat.Offset;
@@ -29,39 +29,75 @@ struct TopicSessionClientStatistic {
2929
}
3030
};
3131

32-
struct TopicSessionCommonStatistic {
32+
struct TParserStatistic {
33+
TDuration ParserLatency;
34+
35+
void Add(const TParserStatistic& stat) {
36+
ParserLatency = stat.ParserLatency != TDuration::Zero() ? stat.ParserLatency : ParserLatency;
37+
}
38+
};
39+
40+
struct TFiltersStatistic {
41+
TDuration FilterLatency;
42+
43+
void Add(const TFiltersStatistic& stat) {
44+
FilterLatency = stat.FilterLatency != TDuration::Zero() ? stat.FilterLatency : FilterLatency;
45+
}
46+
};
47+
48+
struct TFormatHandlerStatistic {
49+
TDuration ParseAndFilterLatency;
50+
51+
TParserStatistic ParserStats;
52+
TFiltersStatistic FilterStats;
53+
54+
void Add(const TFormatHandlerStatistic& stat) {
55+
ParseAndFilterLatency = stat.ParseAndFilterLatency != TDuration::Zero() ? stat.ParseAndFilterLatency : ParseAndFilterLatency;
56+
57+
ParserStats.Add(stat.ParserStats);
58+
FilterStats.Add(stat.FilterStats);
59+
}
60+
};
61+
62+
struct TTopicSessionCommonStatistic {
3363
ui64 UnreadBytes = 0; // Current value
3464
ui64 RestartSessionByOffsets = 0;
3565
ui64 ReadBytes = 0; // Increment
3666
ui64 ReadEvents = 0; // Increment
3767
ui64 LastReadedOffset = 0;
38-
TDuration ParseAndFilterLatency;
39-
void Add(const TopicSessionCommonStatistic& stat) {
68+
69+
std::unordered_map<TString, TFormatHandlerStatistic> FormatHandlers;
70+
71+
void Add(const TTopicSessionCommonStatistic& stat) {
4072
UnreadBytes = stat.UnreadBytes;
4173
RestartSessionByOffsets = stat.RestartSessionByOffsets;
4274
ReadBytes += stat.ReadBytes;
4375
ReadEvents += stat.ReadEvents;
4476
LastReadedOffset = stat.LastReadedOffset;
45-
ParseAndFilterLatency = stat.ParseAndFilterLatency != TDuration::Zero() ? stat.ParseAndFilterLatency : ParseAndFilterLatency;
77+
78+
for (const auto& [formatName, foramtStats] : stat.FormatHandlers) {
79+
FormatHandlers[formatName].Add(foramtStats);
80+
}
4681
}
82+
4783
void Clear() {
4884
ReadBytes = 0;
4985
ReadEvents = 0;
5086
}
5187
};
5288

53-
struct TopicSessionParams {
89+
struct TTopicSessionParams {
5490
TString ReadGroup;
5591
TString Endpoint;
5692
TString Database;
5793
TString TopicPath;
5894
ui64 PartitionId = 0;
5995
};
6096

61-
struct TopicSessionStatistic {
62-
TopicSessionParams SessionKey;
63-
TVector<TopicSessionClientStatistic> Clients;
64-
TopicSessionCommonStatistic Common;
97+
struct TTopicSessionStatistic {
98+
TTopicSessionParams SessionKey;
99+
std::vector<TTopicSessionClientStatistic> Clients;
100+
TTopicSessionCommonStatistic Common;
65101
};
66102

67103
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/events/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ SRCS(
77
PEERDIR(
88
ydb/core/fq/libs/events
99
ydb/core/fq/libs/row_dispatcher/protos
10+
1011
ydb/library/actors/core
1112
ydb/library/yql/providers/pq/provider
13+
14+
yql/essentials/public/issue
1215
)
1316

1417
END()
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#include "common.h"
2+
3+
#include <util/string/builder.h>
4+
5+
namespace NFq::NRowDispatcher {
6+
7+
//// TSchemaColumn
8+
9+
TString TSchemaColumn::ToString() const {
10+
return TStringBuilder() << "'" << Name << "' : " << TypeYson;
11+
}
12+
13+
} // namespace NFq::NRowDispatcher
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include <ydb/library/conclusion/generic/result.h>
4+
#include <ydb/library/conclusion/status.h>
5+
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
6+
7+
namespace NFq::NRowDispatcher {
8+
9+
using EStatusId = NYql::NDqProto::StatusIds;
10+
using TStatusCode = EStatusId::StatusCode;
11+
using TStatus = NKikimr::TYQLConclusionSpecialStatus<TStatusCode, EStatusId::SUCCESS, EStatusId::INTERNAL_ERROR>;
12+
13+
template <typename TValue>
14+
using TValueStatus = NKikimr::TConclusionImpl<TStatus, TValue>;
15+
16+
struct TSchemaColumn {
17+
TString Name;
18+
TString TypeYson;
19+
20+
bool operator==(const TSchemaColumn& other) const = default;
21+
22+
TString ToString() const;
23+
};
24+
25+
} // namespace NFq::NRowDispatcher
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
common.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/library/conclusion
9+
ydb/library/yql/dq/actors/protos
10+
11+
yql/essentials/public/issue
12+
)
13+
14+
YQL_LAST_ABI_VERSION()
15+
16+
END()

0 commit comments

Comments
 (0)