Skip to content

Commit 4c31460

Browse files
authored
Support of replaying YtGateway::{CanonizePaths,GetTableInfo} (#4185)
1 parent d2bab9a commit 4c31460

File tree

3 files changed

+234
-3
lines changed

3 files changed

+234
-3
lines changed

ydb/library/yql/core/ut/yql_qplayer_ut.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,11 @@ Y_UNIT_TEST_SUITE(QPlayerTests) {
151151
CheckProgram(s, tables);
152152
});
153153
}
154+
155+
Y_UNIT_TEST(YtGetTableInfo) {
156+
auto s = "select * from plato.Input";
157+
WithTables([&](const auto& tables) {
158+
CheckProgram(s, tables);
159+
});
160+
}
154161
}

ydb/library/yql/providers/yt/gateway/qplayer/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ PEERDIR(
1111
ydb/library/yql/core/file_storage
1212
library/cpp/yson/node
1313
library/cpp/random_provider
14+
yt/cpp/mapreduce/interface
1415
contrib/libs/openssl
1516
)
1617

ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp

+226-3
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,24 @@
22

33
#include <ydb/library/yql/core/file_storage/storage.h>
44
#include <library/cpp/yson/node/node_io.h>
5+
#include <library/cpp/yson/node/node_builder.h>
6+
#include <yt/cpp/mapreduce/interface/serialize.h>
57

68
#include <util/stream/file.h>
79

810
#include <openssl/sha.h>
911

12+
namespace NYT {
13+
//TODO: use from header
14+
void Deserialize(TReadRange& readRange, const TNode& node);
15+
}
16+
1017
namespace NYql {
1118

1219
namespace {
1320

21+
const TString YtGateway_CanonizePaths = "YtGateway_CanonizePaths";
22+
const TString YtGateway_GetTableInfo = "YtGateway_GetTableInfo";
1423
const TString YtGateway_GetFolder = "YtGateway_GetFolder";
1524

1625
TString MakeHash(const TString& str) {
@@ -55,16 +64,230 @@ class TGateway : public IYtGateway {
5564
return Inner_->Finalize(std::move(options));
5665
}
5766

67+
static TString MakeCanonizePathKey(const TCanonizeReq& req) {
68+
auto node = NYT::TNode()
69+
("Cluster", req.Cluster())
70+
("Path", req.Path());
71+
72+
return MakeHash(NYT::NodeToCanonicalYsonString(node));
73+
}
74+
5875
NThreading::TFuture<TCanonizePathsResult> CanonizePaths(TCanonizePathsOptions&& options) final {
59-
return Inner_->CanonizePaths(std::move(options));
76+
if (QContext_.CanRead()) {
77+
TCanonizePathsResult res;
78+
res.SetSuccess();
79+
for (const auto& req : options.Paths()) {
80+
auto key = MakeCanonizePathKey(req);
81+
auto item = QContext_.GetReader()->Get({YtGateway_CanonizePaths, key}).GetValueSync();
82+
if (!item) {
83+
throw yexception() << "Missing replay data";
84+
}
85+
86+
auto valueNode = NYT::NodeFromYsonString(item->Value);
87+
TCanonizedPath p;
88+
p.Path = valueNode["Path"].AsString();
89+
if (valueNode.HasKey("Columns")) {
90+
p.Columns.ConstructInPlace();
91+
for (const auto& c : valueNode["Columns"].AsList()) {
92+
p.Columns->push_back(c.AsString());
93+
}
94+
}
95+
96+
if (valueNode.HasKey("Ranges")) {
97+
p.Ranges.ConstructInPlace();
98+
for (const auto& r : valueNode["Ranges"].AsString()) {
99+
NYT::TReadRange range;
100+
NYT::Deserialize(range, r);
101+
p.Ranges->push_back(range);
102+
}
103+
}
104+
105+
if (valueNode.HasKey("AdditionalAttributes")) {
106+
p.AdditionalAttributes = valueNode["AdditionalAttributes"].AsString();
107+
}
108+
109+
res.Data.push_back(p);
110+
}
111+
112+
return NThreading::MakeFuture<TCanonizePathsResult>(res);
113+
}
114+
115+
auto optionsDup = options;
116+
return Inner_->CanonizePaths(std::move(options))
117+
.Subscribe([qContext = QContext_, optionsDup](const NThreading::TFuture<TCanonizePathsResult>& future) {
118+
if (!qContext.CanWrite() || future.HasException()) {
119+
return;
120+
}
121+
122+
const auto& res = future.GetValueSync();
123+
if (!res.Success()) {
124+
return;
125+
}
126+
127+
Y_ENSURE(res.Data.size() == optionsDup.Paths().size());
128+
for (size_t i = 0; i < res.Data.size(); ++i) {
129+
auto key = MakeCanonizePathKey(optionsDup.Paths()[i]);
130+
auto valueNode = NYT::TNode();
131+
const auto& canon = res.Data[i];
132+
valueNode("Path", canon.Path);
133+
if (canon.Columns) {
134+
NYT::TNode columnsNode;
135+
for (const auto& c : *canon.Columns) {
136+
columnsNode.Add(NYT::TNode(c));
137+
}
138+
139+
valueNode("Columns", columnsNode);
140+
}
141+
142+
if (canon.Ranges) {
143+
NYT::TNode rangesNode;
144+
for (const auto& r : *canon.Ranges) {
145+
NYT::TNode rangeNode;
146+
NYT::TNodeBuilder builder(&rangeNode);
147+
NYT::Serialize(r, &builder);
148+
rangesNode.Add(rangeNode);
149+
}
150+
151+
valueNode("Ranges", rangesNode);
152+
}
153+
154+
if (canon.AdditionalAttributes) {
155+
valueNode("AdditionalAttributes", NYT::TNode(*canon.AdditionalAttributes));
156+
}
157+
158+
auto value = NYT::NodeToYsonString(valueNode, NYT::NYson::EYsonFormat::Binary);
159+
qContext.GetWriter()->Put({YtGateway_CanonizePaths, key}, value).GetValueSync();
160+
}
161+
});
162+
}
163+
164+
static TString MakeGetTableInfoKey(const TTableReq& req, ui32 epoch) {
165+
auto tableNode = NYT::TNode()
166+
("Cluster", req.Cluster())
167+
("Table", req.Table());
168+
169+
if (req.InferSchemaRows() != 0) {
170+
tableNode("InferSchemaRows", req.InferSchemaRows());
171+
}
172+
173+
if (req.ForceInferSchema()) {
174+
tableNode("ForceInferSchema", req.ForceInferSchema());
175+
}
176+
177+
if (req.Anonymous()) {
178+
tableNode("Anonymous", req.Anonymous());
179+
}
180+
181+
if (req.IgnoreYamrDsv()) {
182+
tableNode("IgnoreYamrDsv", req.IgnoreYamrDsv());
183+
}
184+
185+
if (req.IgnoreWeakSchema()) {
186+
tableNode("IgnoreWeakSchema", req.IgnoreWeakSchema());
187+
}
188+
189+
auto node = NYT::TNode()
190+
("Table", tableNode)
191+
("Epoch", epoch);
192+
193+
return MakeHash(NYT::NodeToCanonicalYsonString(node));
60194
}
61195

62196
NThreading::TFuture<TTableInfoResult> GetTableInfo(TGetTableInfoOptions&& options) final {
63197
if (QContext_.CanRead()) {
64-
throw yexception() << "Can't replay GetTableInfo";
198+
TTableInfoResult res;
199+
res.SetSuccess();
200+
for (const auto& req : options.Tables()) {
201+
TTableInfoResult::TTableData data;
202+
auto key = MakeGetTableInfoKey(req, options.Epoch());
203+
auto item = QContext_.GetReader()->Get({YtGateway_GetTableInfo, key}).GetValueSync();
204+
if (!item) {
205+
throw yexception() << "Missing replay data";
206+
}
207+
208+
auto valueNode = NYT::NodeFromYsonString(item->Value);
209+
data.Meta = MakeIntrusive<TYtTableMetaInfo>();
210+
auto metaNode = valueNode["Meta"];
211+
212+
data.Meta->CanWrite = metaNode["CanWrite"].AsBool();
213+
data.Meta->DoesExist = metaNode["DoesExist"].AsBool();
214+
data.Meta->YqlCompatibleScheme = metaNode["YqlCompatibleScheme"].AsBool();
215+
data.Meta->InferredScheme = metaNode["InferredScheme"].AsBool();
216+
data.Meta->IsDynamic = metaNode["IsDynamic"].AsBool();
217+
data.Meta->SqlView = metaNode["SqlView"].AsString();
218+
data.Meta->SqlViewSyntaxVersion = metaNode["SqlViewSyntaxVersion"].AsUint64();
219+
for (const auto& x : metaNode["Attrs"].AsMap()) {
220+
data.Meta->Attrs[x.first] = x.second.AsString();
221+
}
222+
223+
data.Stat = MakeIntrusive<TYtTableStatInfo>();
224+
auto statNode = valueNode["Stat"];
225+
data.Stat->Id = statNode["Id"].AsString();
226+
data.Stat->RecordsCount = statNode["RecordsCount"].AsUint64();
227+
data.Stat->DataSize = statNode["DataSize"].AsUint64();
228+
data.Stat->ChunkCount = statNode["ChunkCount"].AsUint64();
229+
data.Stat->ModifyTime = statNode["ModifyTime"].AsUint64();
230+
data.Stat->Revision = statNode["Revision"].AsUint64();
231+
data.Stat->TableRevision = statNode["TableRevision"].AsUint64();
232+
233+
data.WriteLock = options.ReadOnly() ? false : valueNode["WriteLock"].AsBool();
234+
res.Data.push_back(data);
235+
}
236+
237+
return NThreading::MakeFuture<TTableInfoResult>(res);
65238
}
66239

67-
return Inner_->GetTableInfo(std::move(options));
240+
auto optionsDup = options;
241+
return Inner_->GetTableInfo(std::move(options))
242+
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TTableInfoResult>& future) {
243+
if (!qContext.CanWrite() || future.HasException()) {
244+
return;
245+
}
246+
247+
const auto& res = future.GetValueSync();
248+
if (!res.Success()) {
249+
return;
250+
}
251+
252+
Y_ENSURE(res.Data.size() == optionsDup.Tables().size());
253+
for (size_t i = 0; i < res.Data.size(); ++i) {
254+
const auto& req = optionsDup.Tables()[i];
255+
const auto& data = res.Data[i];
256+
auto key = MakeGetTableInfoKey(req, optionsDup.Epoch());
257+
258+
auto attrsNode = NYT::TNode();
259+
for (const auto& a : data.Meta->Attrs) {
260+
attrsNode(a.first, a.second);
261+
}
262+
263+
auto metaNode = NYT::TNode()
264+
("CanWrite",data.Meta->CanWrite)
265+
("DoesExist",data.Meta->DoesExist)
266+
("YqlCompatibleScheme",data.Meta->YqlCompatibleScheme)
267+
("InferredScheme",data.Meta->InferredScheme)
268+
("IsDynamic",data.Meta->IsDynamic)
269+
("SqlView",data.Meta->SqlView)
270+
("SqlViewSyntaxVersion",ui64(data.Meta->SqlViewSyntaxVersion))
271+
("Attrs",attrsNode);
272+
273+
auto statNode = NYT::TNode()
274+
("Id",data.Stat->Id)
275+
("RecordsCount",data.Stat->RecordsCount)
276+
("DataSize",data.Stat->DataSize)
277+
("ChunkCount",data.Stat->ChunkCount)
278+
("ModifyTime",data.Stat->ModifyTime)
279+
("Revision",data.Stat->Revision)
280+
("TableRevision",data.Stat->TableRevision);
281+
282+
auto valueNode = NYT::TNode()
283+
("Meta", metaNode)
284+
("Stat", statNode)
285+
("WriteLock", data.WriteLock);
286+
287+
auto value = NYT::NodeToYsonString(valueNode, NYT::NYson::EYsonFormat::Binary);
288+
qContext.GetWriter()->Put({YtGateway_GetTableInfo, key},value).GetValueSync();
289+
}
290+
});
68291
}
69292

70293
NThreading::TFuture<TTableRangeResult> GetTableRange(TTableRangeOptions&& options) final {

0 commit comments

Comments
 (0)