Skip to content

Commit 2b39cf8

Browse files
authored
Replay PathStat in yt gateway & other fixes (#9996)
1 parent 2d80271 commit 2b39cf8

File tree

3 files changed

+123
-7
lines changed

3 files changed

+123
-7
lines changed

ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ class TResolver : public IUdfResolver {
3030

3131
TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const final {
3232
if (QContext_.CanRead()) {
33-
ythrow yexception() << "can't replay GetSystemModulePath";
33+
return MakeMaybe<TFilePathWithMd5>("", "");
3434
}
3535

36-
return Inner_-> GetSystemModulePath(moduleName);
36+
return Inner_->GetSystemModulePath(moduleName);
3737
}
3838

3939
bool LoadMetadata(const TVector<TImport*>& imports,

ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ class TDqExecutionValidator {
185185
}
186186
}
187187

188-
if (!hasError && HasMapJoin_ && !TypeCtx_.ForceDq && !TypeCtx_.QContext.CanRead()) {
188+
if (!hasError && HasMapJoin_ && !TypeCtx_.ForceDq) {
189189
size_t dataSize = 0;
190190
for (auto& [integration, nodes]: ReadsPerProvider_) {
191191
TMaybe<ui64> size;

ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp

Lines changed: 120 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const TString YtGateway_GetTableRange = "YtGateway_GetTableRange";
2424
const TString YtGateway_GetFolder = "YtGateway_GetFolder";
2525
const TString YtGateway_GetFolders = "YtGateway_GetFolders";
2626
const TString YtGateway_ResolveLinks = "YtGateway_ResolveLinks";
27+
const TString YtGateway_PathStat = "YtGateway_PathStat";
2728

2829
TString MakeHash(const TString& str) {
2930
SHA256_CTX sha;
@@ -98,7 +99,7 @@ class TGateway : public IYtGateway {
9899

99100
if (valueNode.HasKey("Ranges")) {
100101
p.Ranges.ConstructInPlace();
101-
for (const auto& r : valueNode["Ranges"].AsString()) {
102+
for (const auto& r : valueNode["Ranges"].AsList()) {
102103
NYT::TReadRange range;
103104
NYT::Deserialize(range, r);
104105
p.Ranges->push_back(range);
@@ -697,17 +698,132 @@ class TGateway : public IYtGateway {
697698
return Inner_->DropTrackables(std::move(options));
698699
}
699700

701+
static TString MakePathStatKey(const TString& cluster, bool extended, const TPathStatReq& req) {
702+
auto node = NYT::TNode()
703+
("Cluster", cluster)
704+
("Extended", extended);
705+
706+
NYT::TNode pathNode;
707+
NYT::TNodeBuilder builder(&pathNode);
708+
NYT::Serialize(req.Path(), &builder);
709+
auto path = NYT::TNode()
710+
("Path", pathNode)
711+
("IsTemp", req.IsTemp())
712+
("IsAnonymous", req.IsAnonymous())
713+
("Epoch", req.Epoch());
714+
715+
node("Path", path);
716+
return MakeHash(NYT::NodeToCanonicalYsonString(node));
717+
}
718+
719+
static TString SerializePathStat(const TPathStatResult& stat, ui32 index) {
720+
Y_ENSURE(index < stat.DataSize.size());
721+
Y_ENSURE(index < stat.Extended.size());
722+
auto xNode = NYT::TNode();
723+
if (!stat.Extended[index].Defined()) {
724+
xNode = NYT::TNode::CreateEntity();
725+
} else {
726+
auto dataWeightMap = NYT::TNode::CreateMap();
727+
for (const auto& d : stat.Extended[index]->DataWeight) {
728+
dataWeightMap(d.first, d.second);
729+
}
730+
731+
auto uniqCountsMap = NYT::TNode::CreateMap();
732+
for (const auto& e : stat.Extended[index]->EstimatedUniqueCounts) {
733+
uniqCountsMap(e.first, e.second);
734+
}
735+
736+
xNode = NYT::TNode()
737+
("DataWeight", dataWeightMap)
738+
("EstimatedUniqueCounts", uniqCountsMap);
739+
}
740+
741+
auto node = NYT::TNode()
742+
("DataSize", stat.DataSize[index])
743+
("Extended", xNode);
744+
745+
return NYT::NodeToYsonString(node, NYT::NYson::EYsonFormat::Binary);
746+
}
747+
748+
static void DeserializePathStat(const NYT::TNode& node, TPathStatResult& stat, ui32 index) {
749+
Y_ENSURE(index < stat.DataSize.size());
750+
Y_ENSURE(index < stat.Extended.size());
751+
stat.DataSize[index] = node["DataSize"].AsUint64();
752+
stat.Extended[index] = Nothing();
753+
const auto& x = node["Extended"];
754+
if (!x.IsEntity()) {
755+
auto& xValue = stat.Extended[index];
756+
xValue.ConstructInPlace();
757+
for (const auto& d : x["DataWeight"].AsMap()) {
758+
xValue->DataWeight[d.first] = d.second.AsInt64();
759+
}
760+
761+
for (const auto& e : x["EstimatedUniqueCounts"].AsMap()) {
762+
xValue->EstimatedUniqueCounts[e.first] = e.second.AsUint64();
763+
}
764+
}
765+
}
766+
700767
NThreading::TFuture<TPathStatResult> PathStat(TPathStatOptions&& options) final {
701768
if (QContext_.CanRead()) {
702-
throw yexception() << "Can't replay PathStat";
769+
TPathStatResult res;
770+
res.DataSize.resize(options.Paths().size(), 0);
771+
res.Extended.resize(options.Paths().size());
772+
773+
for (ui32 index = 0; index < options.Paths().size(); ++index) {
774+
const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index]);
775+
auto item = QContext_.GetReader()->Get({YtGateway_PathStat, key}).GetValueSync();
776+
if (!item) {
777+
throw yexception() << "Missing replay data";
778+
}
779+
780+
auto valueNode = NYT::NodeFromYsonString(TStringBuf(item->Value));
781+
DeserializePathStat(valueNode, res, index);
782+
}
783+
784+
res.SetSuccess();
785+
return NThreading::MakeFuture<TPathStatResult>(res);
703786
}
704787

705-
return Inner_->PathStat(std::move(options));
788+
auto optionsDup = options;
789+
return Inner_->PathStat(std::move(options))
790+
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TPathStatResult>& future) {
791+
if (!qContext.CanWrite() || future.HasException()) {
792+
return;
793+
}
794+
795+
const auto& res = future.GetValueSync();
796+
if (!res.Success()) {
797+
return;
798+
}
799+
800+
for (ui32 index = 0; index < optionsDup.Paths().size(); ++index) {
801+
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index]);
802+
auto value = SerializePathStat(res, index);
803+
qContext.GetWriter()->Put({YtGateway_PathStat, key}, value).GetValueSync();
804+
}
805+
});
706806
}
707807

708808
TPathStatResult TryPathStat(TPathStatOptions&& options) final {
709809
if (QContext_.CanRead()) {
710-
throw yexception() << "Can't replay TryPathStat";
810+
TPathStatResult res;
811+
res.DataSize.resize(options.Paths().size(), 0);
812+
res.Extended.resize(options.Paths().size());
813+
814+
for (ui32 index = 0; index < options.Paths().size(); ++index) {
815+
const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index]);
816+
auto item = QContext_.GetReader()->Get({YtGateway_PathStat, key}).GetValueSync();
817+
if (!item) {
818+
return res;
819+
}
820+
821+
auto valueNode = NYT::NodeFromYsonString(TStringBuf(item->Value));
822+
DeserializePathStat(valueNode, res, index);
823+
}
824+
825+
res.SetSuccess();
826+
return res;
711827
}
712828

713829
return Inner_->TryPathStat(std::move(options));

0 commit comments

Comments
 (0)