diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index f577cdf79d65..1d9e2351bb8f 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -1,3 +1,5 @@ +#include "yql_dq_exectransformer.h" + #include #include @@ -400,12 +402,24 @@ struct TDqsFinalPipelineConfigurator : public IPipelineConfigurator { TDqStatePtr State_; }; +class TSimpleSkiffConverter : public ISkiffConverter { +public: + TString ConvertNodeToSkiff(const TDqStatePtr /*state*/, const IDataProvider::TFillSettings& /*fillSettings*/, const NYT::TNode& /*rowSpec*/, const NYT::TNode& /*item*/) override { + Y_ABORT("not implemented"); + } + + TYtType ParseYTType(const TExprNode& /*node*/, TExprContext& /*ctx*/, const TMaybe& /*columns*/) override { + Y_ABORT("not implemented"); + } +}; + class TDqExecTransformer: public TExecTransformerBase, TCounters { public: - TDqExecTransformer(const TDqStatePtr& state) + TDqExecTransformer(const TDqStatePtr& state, const ISkiffConverterPtr& skiffConverter) : State(state) , ExecState(MakeIntrusive()) + , SkiffConverter(skiffConverter) { AddHandler({TStringBuf("Result")}, RequireNone(), Hndl(&TDqExecTransformer::HandleResult)); AddHandler({TStringBuf("Pull")}, RequireNone(), Hndl(&TDqExecTransformer::HandlePull)); @@ -994,7 +1008,17 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters FlushStatisticsToState(); - return WrapFutureCallback(future, [localRun, startTime, type, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + TString skiffType; + NYT::TNode rowSpec; + if (fillSettings.Format == IDataProvider::EResultFormat::Skiff) { + auto parsedYtType = SkiffConverter->ParseYTType(result.Input().Ref(), ctx, columns); + + type = parsedYtType.Type; + rowSpec = parsedYtType.RowSpec; + skiffType = parsedYtType.SkiffType; + } + + return WrapFutureCallback(future, [localRun, startTime, type, rowSpec, skiffType, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State, skiffConverter = SkiffConverter](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { YQL_CLOG(DEBUG, ProviderDq) << state->SessionId << " WrapFutureCallback"; auto duration = TInstant::Now() - startTime; @@ -1036,6 +1060,20 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters TStringStream out; NYson::TYsonWriter writer((IOutputStream*)&out); writer.OnBeginMap(); + + if (skiffType) { + writer.OnKeyedItem("SkiffType"); + writer.OnRaw(skiffType, ::NYson::EYsonType::Node); + + writer.OnKeyedItem("Columns"); + writer.OnBeginList(); + for (auto& column: columns) { + writer.OnListItem(); + writer.OnStringScalar(column); + } + writer.OnEndList(); + } + if (type) { writer.OnKeyedItem("Type"); writer.OnRaw(type); @@ -1054,21 +1092,34 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters if (truncated && item.IsList()) { ui64 bytes = 0; ui64 rows = 0; - writer.OnBeginList(); - for (auto& node : item.AsList()) { - raw = NYT::NodeToYsonString(node); - bytes += raw.size(); - rows += 1; - writer.OnListItem(); - writer.OnRaw(raw); - if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) { + switch (fillSettings.Format) { + case IDataProvider::EResultFormat::Yson: { + writer.OnBeginList(); + for (auto& node : item.AsList()) { + raw = NYT::NodeToYsonString(node); + bytes += raw.size(); + rows += 1; + writer.OnListItem(); + writer.OnRaw(raw); + if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) { + break; + } + if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) { + break; + } + } + writer.OnEndList(); break; } - if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) { + case IDataProvider::EResultFormat::Skiff: { + writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item)); break; } + default: { + YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format; + } } - writer.OnEndList(); + if (enableFullResultWrite) { writer.OnKeyedItem("Ref"); writer.OnBeginList(); @@ -1081,11 +1132,35 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters writer.OnKeyedItem("Truncated"); writer.OnBooleanScalar(true); } else if (truncated) { - writer.OnRaw("[]"); + switch (fillSettings.Format) { + case IDataProvider::EResultFormat::Yson: { + writer.OnRaw("[]"); + break; + } + case IDataProvider::EResultFormat::Skiff: { + writer.OnStringScalar(""); + break; + } + default: { + YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format; + } + } writer.OnKeyedItem("Truncated"); writer.OnBooleanScalar(true); } else { - writer.OnRaw(raw); + switch (fillSettings.Format) { + case IDataProvider::EResultFormat::Yson: { + writer.OnRaw(raw); + break; + } + case IDataProvider::EResultFormat::Skiff: { + writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item)); + break; + } + default: { + YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format; + } + } } if (rowsCount) { @@ -1373,9 +1448,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters }); executionPlanner.Destroy(); + TString skiffType; + NYT::TNode rowSpec; + if (fillSettings.Format == IDataProvider::EResultFormat::Skiff) { + auto parsedYtType = SkiffConverter->ParseYTType(pull.Input().Ref(), ctx, columns); + + type = parsedYtType.Type; + rowSpec = parsedYtType.RowSpec; + skiffType = parsedYtType.SkiffType; + } + int level = 0; // TODO: remove copy-paste - return WrapFutureCallback(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + return WrapFutureCallback(future, [settings, startTime, localRun, type, rowSpec, skiffType, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State, skiffConverter = SkiffConverter](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { auto duration = TInstant::Now() - startTime; YQL_CLOG(INFO, ProviderDq) << "Execution Pull complete, duration: " << duration; if (state->Metrics) { @@ -1406,8 +1491,26 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters input->SetState(TExprNode::EState::ExecutionComplete); TStringStream out; - NYson::TYsonWriter writer((IOutputStream*)&out, NCommon::GetYsonFormat(fillSettings), ::NYson::EYsonType::Node, false); + + IDataProvider::TFillSettings ysonFormatSettings; + ysonFormatSettings.FormatDetails = fillSettings.FormatDetails; + ysonFormatSettings.Format = IDataProvider::EResultFormat::Yson; + NYson::TYsonWriter writer((IOutputStream*)&out, NCommon::GetYsonFormat(ysonFormatSettings), ::NYson::EYsonType::Node, false); writer.OnBeginMap(); + + if (skiffType) { + writer.OnKeyedItem("SkiffType"); + writer.OnRaw(skiffType, ::NYson::EYsonType::Node); + + writer.OnKeyedItem("Columns"); + writer.OnBeginList(); + for (auto& column: columns) { + writer.OnListItem(); + writer.OnStringScalar(column); + } + writer.OnEndList(); + } + if (type) { writer.OnKeyedItem("Type"); writer.OnRaw(type); @@ -1447,21 +1550,35 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters // TODO: ui64 bytes = 0; ui64 rows = 0; - writer.OnBeginList(); - for (auto& node : item.AsList()) { - raw = NYT::NodeToYsonString(node); - bytes += raw.size(); - rows += 1; - writer.OnListItem(); - writer.OnRaw(raw); - if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) { + switch (fillSettings.Format) { + case IDataProvider::EResultFormat::Yson: { + writer.OnBeginList(); + + for (auto& node : item.AsList()) { + raw = NYT::NodeToYsonString(node); + bytes += raw.size(); + rows += 1; + writer.OnListItem(); + writer.OnRaw(raw); + + if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) { + break; + } + if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) { + break; + } + } + writer.OnEndList(); break; } - if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) { + case IDataProvider::EResultFormat::Skiff: { + writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item)); break; } + default: { + YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format; + } } - writer.OnEndList(); if (enableFullResultWrite) { writer.OnKeyedItem("Ref"); @@ -1476,7 +1593,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters writer.OnKeyedItem("Truncated"); writer.OnBooleanScalar(true); } else { - writer.OnRaw(raw); + switch (fillSettings.Format) { + case IDataProvider::EResultFormat::Yson: { + writer.OnRaw(raw); + break; + } + case IDataProvider::EResultFormat::Skiff: { + writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item)); + break; + } + default: { + YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format; + } + } } if (rowsCount) { @@ -1923,6 +2052,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters private: TDqStatePtr State; TExecStatePtr ExecState; + ISkiffConverterPtr SkiffConverter; mutable THashMap FileLinks; mutable THashMap ModulesMapping; @@ -1932,7 +2062,11 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters } IGraphTransformer* CreateDqExecTransformer(const TDqStatePtr& state) { - return new TDqExecTransformer(state); + return new TDqExecTransformer(state, MakeIntrusive()); +} + +TExecTransformerFactory CreateDqExecTransformerFactory(const ISkiffConverterPtr& skiffConverter) { + return [skiffConverter] (const TDqStatePtr& state) { return new TDqExecTransformer(state, skiffConverter); }; } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h index c2f376629040..07bb1c1e70b3 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include @@ -8,5 +9,21 @@ namespace NYql { struct TDqState; using TDqStatePtr = TIntrusivePtr; + class ISkiffConverter : public TThrRefBase { + public: + struct TYtType { + TString Type; + TString SkiffType; + NYT::TNode RowSpec; + }; + + virtual TString ConvertNodeToSkiff(const TDqStatePtr state, const IDataProvider::TFillSettings& fillSettings, const NYT::TNode& rowSpec, const NYT::TNode& item) = 0; + virtual TYtType ParseYTType(const TExprNode& node, TExprContext& ctx, const TMaybe& columns) = 0; + }; + using ISkiffConverterPtr = TIntrusivePtr; + IGraphTransformer* CreateDqExecTransformer(const TDqStatePtr& state); + + using TExecTransformerFactory = std::function; + TExecTransformerFactory CreateDqExecTransformerFactory(const ISkiffConverterPtr& skiffConverter); } // namespace NYql