Skip to content

Commit 28332a1

Browse files
authored
Merge b15e52d into 1717439
2 parents 1717439 + b15e52d commit 28332a1

File tree

2 files changed

+179
-28
lines changed

2 files changed

+179
-28
lines changed

ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp

+162-28
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include "yql_dq_exectransformer.h"
2+
13
#include <ydb/library/yql/providers/dq/provider/yql_dq_datasource.h>
24
#include <ydb/library/yql/providers/dq/provider/yql_dq_state.h>
35

@@ -400,12 +402,24 @@ struct TDqsFinalPipelineConfigurator : public IPipelineConfigurator {
400402
TDqStatePtr State_;
401403
};
402404

405+
class TSimpleSkiffConverter : public ISkiffConverter {
406+
public:
407+
TString ConvertNodeToSkiff(const TDqStatePtr /*state*/, const IDataProvider::TFillSettings& /*fillSettings*/, const NYT::TNode& /*rowSpec*/, const NYT::TNode& /*item*/) override {
408+
Y_ABORT("not implemented");
409+
}
410+
411+
TYtType ParseYTType(const TExprNode& /*node*/, TExprContext& /*ctx*/, const TMaybe<NYql::TColumnOrder>& /*columns*/) override {
412+
Y_ABORT("not implemented");
413+
}
414+
};
415+
403416
class TDqExecTransformer: public TExecTransformerBase, TCounters
404417
{
405418
public:
406-
TDqExecTransformer(const TDqStatePtr& state)
419+
TDqExecTransformer(const TDqStatePtr& state, const ISkiffConverterPtr& skiffConverter)
407420
: State(state)
408421
, ExecState(MakeIntrusive<TExecState>())
422+
, SkiffConverter(skiffConverter)
409423
{
410424
AddHandler({TStringBuf("Result")}, RequireNone(), Hndl(&TDqExecTransformer::HandleResult));
411425
AddHandler({TStringBuf("Pull")}, RequireNone(), Hndl(&TDqExecTransformer::HandlePull));
@@ -994,7 +1008,17 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
9941008

9951009
FlushStatisticsToState();
9961010

997-
return WrapFutureCallback<false>(future, [localRun, startTime, type, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
1011+
TString skiffType;
1012+
NYT::TNode rowSpec;
1013+
if (fillSettings.Format == IDataProvider::EResultFormat::Skiff) {
1014+
auto parsedYtType = SkiffConverter->ParseYTType(result.Input().Ref(), ctx, columns);
1015+
1016+
type = parsedYtType.Type;
1017+
rowSpec = parsedYtType.RowSpec;
1018+
skiffType = parsedYtType.SkiffType;
1019+
}
1020+
1021+
return WrapFutureCallback<false>(future, [localRun, startTime, type, rowSpec, skiffType, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State, skiffConverter = SkiffConverter](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
9981022
YQL_CLOG(DEBUG, ProviderDq) << state->SessionId << " WrapFutureCallback";
9991023

10001024
auto duration = TInstant::Now() - startTime;
@@ -1036,6 +1060,20 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
10361060
TStringStream out;
10371061
NYson::TYsonWriter writer((IOutputStream*)&out);
10381062
writer.OnBeginMap();
1063+
1064+
if (skiffType) {
1065+
writer.OnKeyedItem("SkiffType");
1066+
writer.OnRaw(skiffType, ::NYson::EYsonType::Node);
1067+
1068+
writer.OnKeyedItem("Columns");
1069+
writer.OnBeginList();
1070+
for (auto& column: columns) {
1071+
writer.OnListItem();
1072+
writer.OnStringScalar(column);
1073+
}
1074+
writer.OnEndList();
1075+
}
1076+
10391077
if (type) {
10401078
writer.OnKeyedItem("Type");
10411079
writer.OnRaw(type);
@@ -1054,21 +1092,34 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
10541092
if (truncated && item.IsList()) {
10551093
ui64 bytes = 0;
10561094
ui64 rows = 0;
1057-
writer.OnBeginList();
1058-
for (auto& node : item.AsList()) {
1059-
raw = NYT::NodeToYsonString(node);
1060-
bytes += raw.size();
1061-
rows += 1;
1062-
writer.OnListItem();
1063-
writer.OnRaw(raw);
1064-
if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) {
1095+
switch (fillSettings.Format) {
1096+
case IDataProvider::EResultFormat::Yson: {
1097+
writer.OnBeginList();
1098+
for (auto& node : item.AsList()) {
1099+
raw = NYT::NodeToYsonString(node);
1100+
bytes += raw.size();
1101+
rows += 1;
1102+
writer.OnListItem();
1103+
writer.OnRaw(raw);
1104+
if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) {
1105+
break;
1106+
}
1107+
if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) {
1108+
break;
1109+
}
1110+
}
1111+
writer.OnEndList();
10651112
break;
10661113
}
1067-
if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) {
1114+
case IDataProvider::EResultFormat::Skiff: {
1115+
writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item));
10681116
break;
10691117
}
1118+
default: {
1119+
YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format;
1120+
}
10701121
}
1071-
writer.OnEndList();
1122+
10721123
if (enableFullResultWrite) {
10731124
writer.OnKeyedItem("Ref");
10741125
writer.OnBeginList();
@@ -1081,11 +1132,35 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
10811132
writer.OnKeyedItem("Truncated");
10821133
writer.OnBooleanScalar(true);
10831134
} else if (truncated) {
1084-
writer.OnRaw("[]");
1135+
switch (fillSettings.Format) {
1136+
case IDataProvider::EResultFormat::Yson: {
1137+
writer.OnRaw("[]");
1138+
break;
1139+
}
1140+
case IDataProvider::EResultFormat::Skiff: {
1141+
writer.OnStringScalar("");
1142+
break;
1143+
}
1144+
default: {
1145+
YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format;
1146+
}
1147+
}
10851148
writer.OnKeyedItem("Truncated");
10861149
writer.OnBooleanScalar(true);
10871150
} else {
1088-
writer.OnRaw(raw);
1151+
switch (fillSettings.Format) {
1152+
case IDataProvider::EResultFormat::Yson: {
1153+
writer.OnRaw(raw);
1154+
break;
1155+
}
1156+
case IDataProvider::EResultFormat::Skiff: {
1157+
writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item));
1158+
break;
1159+
}
1160+
default: {
1161+
YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format;
1162+
}
1163+
}
10891164
}
10901165

10911166
if (rowsCount) {
@@ -1373,9 +1448,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
13731448
});
13741449
executionPlanner.Destroy();
13751450

1451+
TString skiffType;
1452+
NYT::TNode rowSpec;
1453+
if (fillSettings.Format == IDataProvider::EResultFormat::Skiff) {
1454+
auto parsedYtType = SkiffConverter->ParseYTType(pull.Input().Ref(), ctx, columns);
1455+
1456+
type = parsedYtType.Type;
1457+
rowSpec = parsedYtType.RowSpec;
1458+
skiffType = parsedYtType.SkiffType;
1459+
}
1460+
13761461
int level = 0;
13771462
// TODO: remove copy-paste
1378-
return WrapFutureCallback<false>(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
1463+
return WrapFutureCallback<false>(future, [settings, startTime, localRun, type, rowSpec, skiffType, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State, skiffConverter = SkiffConverter](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
13791464
auto duration = TInstant::Now() - startTime;
13801465
YQL_CLOG(INFO, ProviderDq) << "Execution Pull complete, duration: " << duration;
13811466
if (state->Metrics) {
@@ -1406,8 +1491,26 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
14061491
input->SetState(TExprNode::EState::ExecutionComplete);
14071492

14081493
TStringStream out;
1409-
NYson::TYsonWriter writer((IOutputStream*)&out, NCommon::GetYsonFormat(fillSettings), ::NYson::EYsonType::Node, false);
1494+
1495+
IDataProvider::TFillSettings ysonFormatSettings;
1496+
ysonFormatSettings.FormatDetails = fillSettings.FormatDetails;
1497+
ysonFormatSettings.Format = IDataProvider::EResultFormat::Yson;
1498+
NYson::TYsonWriter writer((IOutputStream*)&out, NCommon::GetYsonFormat(ysonFormatSettings), ::NYson::EYsonType::Node, false);
14101499
writer.OnBeginMap();
1500+
1501+
if (skiffType) {
1502+
writer.OnKeyedItem("SkiffType");
1503+
writer.OnRaw(skiffType, ::NYson::EYsonType::Node);
1504+
1505+
writer.OnKeyedItem("Columns");
1506+
writer.OnBeginList();
1507+
for (auto& column: columns) {
1508+
writer.OnListItem();
1509+
writer.OnStringScalar(column);
1510+
}
1511+
writer.OnEndList();
1512+
}
1513+
14111514
if (type) {
14121515
writer.OnKeyedItem("Type");
14131516
writer.OnRaw(type);
@@ -1447,21 +1550,35 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
14471550
// TODO:
14481551
ui64 bytes = 0;
14491552
ui64 rows = 0;
1450-
writer.OnBeginList();
1451-
for (auto& node : item.AsList()) {
1452-
raw = NYT::NodeToYsonString(node);
1453-
bytes += raw.size();
1454-
rows += 1;
1455-
writer.OnListItem();
1456-
writer.OnRaw(raw);
1457-
if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) {
1553+
switch (fillSettings.Format) {
1554+
case IDataProvider::EResultFormat::Yson: {
1555+
writer.OnBeginList();
1556+
1557+
for (auto& node : item.AsList()) {
1558+
raw = NYT::NodeToYsonString(node);
1559+
bytes += raw.size();
1560+
rows += 1;
1561+
writer.OnListItem();
1562+
writer.OnRaw(raw);
1563+
1564+
if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit) {
1565+
break;
1566+
}
1567+
if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) {
1568+
break;
1569+
}
1570+
}
1571+
writer.OnEndList();
14581572
break;
14591573
}
1460-
if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite) {
1574+
case IDataProvider::EResultFormat::Skiff: {
1575+
writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item));
14611576
break;
14621577
}
1578+
default: {
1579+
YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format;
1580+
}
14631581
}
1464-
writer.OnEndList();
14651582

14661583
if (enableFullResultWrite) {
14671584
writer.OnKeyedItem("Ref");
@@ -1476,7 +1593,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
14761593
writer.OnKeyedItem("Truncated");
14771594
writer.OnBooleanScalar(true);
14781595
} else {
1479-
writer.OnRaw(raw);
1596+
switch (fillSettings.Format) {
1597+
case IDataProvider::EResultFormat::Yson: {
1598+
writer.OnRaw(raw);
1599+
break;
1600+
}
1601+
case IDataProvider::EResultFormat::Skiff: {
1602+
writer.OnStringScalar(skiffConverter->ConvertNodeToSkiff(state, fillSettings, rowSpec, item));
1603+
break;
1604+
}
1605+
default: {
1606+
YQL_LOG_CTX_THROW yexception() << "Invalid result type: " << fillSettings.Format;
1607+
}
1608+
}
14801609
}
14811610

14821611
if (rowsCount) {
@@ -1923,6 +2052,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
19232052
private:
19242053
TDqStatePtr State;
19252054
TExecStatePtr ExecState;
2055+
ISkiffConverterPtr SkiffConverter;
19262056
mutable THashMap<TString, TFileLinkPtr> FileLinks;
19272057
mutable THashMap<TString, TString> ModulesMapping;
19282058

@@ -1932,7 +2062,11 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
19322062
}
19332063

19342064
IGraphTransformer* CreateDqExecTransformer(const TDqStatePtr& state) {
1935-
return new TDqExecTransformer(state);
2065+
return new TDqExecTransformer(state, MakeIntrusive<TSimpleSkiffConverter>());
2066+
}
2067+
2068+
TExecTransformerFactory CreateDqExecTransformerFactory(const ISkiffConverterPtr& skiffConverter) {
2069+
return [skiffConverter] (const TDqStatePtr& state) { return new TDqExecTransformer(state, skiffConverter); };
19362070
}
19372071

19382072
} // namespace NYql
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,29 @@
11
#pragma once
22

33
#include <ydb/library/yql/core/yql_data_provider.h>
4+
#include <ydb/library/yql/core/yql_type_annotation.h>
45

56
#include <util/generic/ptr.h>
67

78
namespace NYql {
89
struct TDqState;
910
using TDqStatePtr = TIntrusivePtr<TDqState>;
1011

12+
class ISkiffConverter : public TThrRefBase {
13+
public:
14+
struct TYtType {
15+
TString Type;
16+
TString SkiffType;
17+
NYT::TNode RowSpec;
18+
};
19+
20+
virtual TString ConvertNodeToSkiff(const TDqStatePtr state, const IDataProvider::TFillSettings& fillSettings, const NYT::TNode& rowSpec, const NYT::TNode& item) = 0;
21+
virtual TYtType ParseYTType(const TExprNode& node, TExprContext& ctx, const TMaybe<NYql::TColumnOrder>& columns) = 0;
22+
};
23+
using ISkiffConverterPtr = TIntrusivePtr<ISkiffConverter>;
24+
1125
IGraphTransformer* CreateDqExecTransformer(const TDqStatePtr& state);
26+
27+
using TExecTransformerFactory = std::function<IGraphTransformer*(const TDqStatePtr& state)>;
28+
TExecTransformerFactory CreateDqExecTransformerFactory(const ISkiffConverterPtr& skiffConverter);
1229
} // namespace NYql

0 commit comments

Comments
 (0)