1
+ #include " yql_dq_exectransformer.h"
2
+
1
3
#include < ydb/library/yql/providers/dq/provider/yql_dq_datasource.h>
2
4
#include < ydb/library/yql/providers/dq/provider/yql_dq_state.h>
3
5
@@ -400,12 +402,24 @@ struct TDqsFinalPipelineConfigurator : public IPipelineConfigurator {
400
402
TDqStatePtr State_;
401
403
};
402
404
405
+ class TSimpleSkiffConverter : public ISkiffConverter {
406
+ public:
407
+ TString ConvertNodeToSkiff (const TDqStatePtr /* state*/ , const IDataProvider::TFillSettings& /* fillSettings*/ , const NYT::TNode& /* rowSpec*/ , const NYT::TNode& /* item*/ ) override {
408
+ Y_ABORT (" not implemented" );
409
+ }
410
+
411
+ TYtType ParseYTType (const TExprNode& /* node*/ , TExprContext& /* ctx*/ , const TMaybe<NYql::TColumnOrder>& /* columns*/ ) override {
412
+ Y_ABORT (" not implemented" );
413
+ }
414
+ };
415
+
403
416
class TDqExecTransformer : public TExecTransformerBase , TCounters
404
417
{
405
418
public:
406
- TDqExecTransformer (const TDqStatePtr& state)
419
+ TDqExecTransformer (const TDqStatePtr& state, const ISkiffConverterPtr& skiffConverter )
407
420
: State(state)
408
421
, ExecState(MakeIntrusive<TExecState>())
422
+ , SkiffConverter(skiffConverter)
409
423
{
410
424
AddHandler ({TStringBuf (" Result" )}, RequireNone (), Hndl (&TDqExecTransformer::HandleResult));
411
425
AddHandler ({TStringBuf (" Pull" )}, RequireNone (), Hndl (&TDqExecTransformer::HandlePull));
@@ -994,7 +1008,17 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
994
1008
995
1009
FlushStatisticsToState ();
996
1010
997
- return WrapFutureCallback<false >(future, [localRun, startTime, type, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
1011
+ TString skiffType;
1012
+ NYT::TNode rowSpec;
1013
+ if (fillSettings.Format == IDataProvider::EResultFormat::Skiff) {
1014
+ auto parsedYtType = SkiffConverter->ParseYTType (result.Input ().Ref (), ctx, columns);
1015
+
1016
+ type = parsedYtType.Type ;
1017
+ rowSpec = parsedYtType.RowSpec ;
1018
+ skiffType = parsedYtType.SkiffType ;
1019
+ }
1020
+
1021
+ return WrapFutureCallback<false >(future, [localRun, startTime, type, rowSpec, skiffType, fillSettings, level, settings, enableFullResultWrite, columns, graphParams, state = State, skiffConverter = SkiffConverter](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
998
1022
YQL_CLOG (DEBUG, ProviderDq) << state->SessionId << " WrapFutureCallback" ;
999
1023
1000
1024
auto duration = TInstant::Now () - startTime;
@@ -1036,6 +1060,20 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
1036
1060
TStringStream out;
1037
1061
NYson::TYsonWriter writer ((IOutputStream*)&out);
1038
1062
writer.OnBeginMap ();
1063
+
1064
+ if (skiffType) {
1065
+ writer.OnKeyedItem (" SkiffType" );
1066
+ writer.OnRaw (skiffType, ::NYson::EYsonType::Node);
1067
+
1068
+ writer.OnKeyedItem (" Columns" );
1069
+ writer.OnBeginList ();
1070
+ for (auto & column: columns) {
1071
+ writer.OnListItem ();
1072
+ writer.OnStringScalar (column);
1073
+ }
1074
+ writer.OnEndList ();
1075
+ }
1076
+
1039
1077
if (type) {
1040
1078
writer.OnKeyedItem (" Type" );
1041
1079
writer.OnRaw (type);
@@ -1054,21 +1092,34 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
1054
1092
if (truncated && item.IsList ()) {
1055
1093
ui64 bytes = 0 ;
1056
1094
ui64 rows = 0 ;
1057
- writer.OnBeginList ();
1058
- for (auto & node : item.AsList ()) {
1059
- raw = NYT::NodeToYsonString (node);
1060
- bytes += raw.size ();
1061
- rows += 1 ;
1062
- writer.OnListItem ();
1063
- writer.OnRaw (raw);
1064
- if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit ) {
1095
+ switch (fillSettings.Format ) {
1096
+ case IDataProvider::EResultFormat::Yson: {
1097
+ writer.OnBeginList ();
1098
+ for (auto & node : item.AsList ()) {
1099
+ raw = NYT::NodeToYsonString (node);
1100
+ bytes += raw.size ();
1101
+ rows += 1 ;
1102
+ writer.OnListItem ();
1103
+ writer.OnRaw (raw);
1104
+ if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit ) {
1105
+ break ;
1106
+ }
1107
+ if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite ) {
1108
+ break ;
1109
+ }
1110
+ }
1111
+ writer.OnEndList ();
1065
1112
break ;
1066
1113
}
1067
- if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite ) {
1114
+ case IDataProvider::EResultFormat::Skiff: {
1115
+ writer.OnStringScalar (skiffConverter->ConvertNodeToSkiff (state, fillSettings, rowSpec, item));
1068
1116
break ;
1069
1117
}
1118
+ default : {
1119
+ YQL_LOG_CTX_THROW yexception () << " Invalid result type: " << fillSettings.Format ;
1120
+ }
1070
1121
}
1071
- writer. OnEndList ();
1122
+
1072
1123
if (enableFullResultWrite) {
1073
1124
writer.OnKeyedItem (" Ref" );
1074
1125
writer.OnBeginList ();
@@ -1081,11 +1132,35 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
1081
1132
writer.OnKeyedItem (" Truncated" );
1082
1133
writer.OnBooleanScalar (true );
1083
1134
} else if (truncated) {
1084
- writer.OnRaw (" []" );
1135
+ switch (fillSettings.Format ) {
1136
+ case IDataProvider::EResultFormat::Yson: {
1137
+ writer.OnRaw (" []" );
1138
+ break ;
1139
+ }
1140
+ case IDataProvider::EResultFormat::Skiff: {
1141
+ writer.OnStringScalar (" " );
1142
+ break ;
1143
+ }
1144
+ default : {
1145
+ YQL_LOG_CTX_THROW yexception () << " Invalid result type: " << fillSettings.Format ;
1146
+ }
1147
+ }
1085
1148
writer.OnKeyedItem (" Truncated" );
1086
1149
writer.OnBooleanScalar (true );
1087
1150
} else {
1088
- writer.OnRaw (raw);
1151
+ switch (fillSettings.Format ) {
1152
+ case IDataProvider::EResultFormat::Yson: {
1153
+ writer.OnRaw (raw);
1154
+ break ;
1155
+ }
1156
+ case IDataProvider::EResultFormat::Skiff: {
1157
+ writer.OnStringScalar (skiffConverter->ConvertNodeToSkiff (state, fillSettings, rowSpec, item));
1158
+ break ;
1159
+ }
1160
+ default : {
1161
+ YQL_LOG_CTX_THROW yexception () << " Invalid result type: " << fillSettings.Format ;
1162
+ }
1163
+ }
1089
1164
}
1090
1165
1091
1166
if (rowsCount) {
@@ -1373,9 +1448,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
1373
1448
});
1374
1449
executionPlanner.Destroy ();
1375
1450
1451
+ TString skiffType;
1452
+ NYT::TNode rowSpec;
1453
+ if (fillSettings.Format == IDataProvider::EResultFormat::Skiff) {
1454
+ auto parsedYtType = SkiffConverter->ParseYTType (pull.Input ().Ref (), ctx, columns);
1455
+
1456
+ type = parsedYtType.Type ;
1457
+ rowSpec = parsedYtType.RowSpec ;
1458
+ skiffType = parsedYtType.SkiffType ;
1459
+ }
1460
+
1376
1461
int level = 0 ;
1377
1462
// TODO: remove copy-paste
1378
- return WrapFutureCallback<false >(future, [settings, startTime, localRun, type, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
1463
+ return WrapFutureCallback<false >(future, [settings, startTime, localRun, type, rowSpec, skiffType, fillSettings, level, graphParams, columns, enableFullResultWrite, state = State, skiffConverter = SkiffConverter ](const IDqGateway::TResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
1379
1464
auto duration = TInstant::Now () - startTime;
1380
1465
YQL_CLOG (INFO, ProviderDq) << " Execution Pull complete, duration: " << duration;
1381
1466
if (state->Metrics ) {
@@ -1406,8 +1491,26 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
1406
1491
input->SetState (TExprNode::EState::ExecutionComplete);
1407
1492
1408
1493
TStringStream out;
1409
- NYson::TYsonWriter writer ((IOutputStream*)&out, NCommon::GetYsonFormat (fillSettings), ::NYson::EYsonType::Node, false );
1494
+
1495
+ IDataProvider::TFillSettings ysonFormatSettings;
1496
+ ysonFormatSettings.FormatDetails = fillSettings.FormatDetails ;
1497
+ ysonFormatSettings.Format = IDataProvider::EResultFormat::Yson;
1498
+ NYson::TYsonWriter writer ((IOutputStream*)&out, NCommon::GetYsonFormat (ysonFormatSettings), ::NYson::EYsonType::Node, false );
1410
1499
writer.OnBeginMap ();
1500
+
1501
+ if (skiffType) {
1502
+ writer.OnKeyedItem (" SkiffType" );
1503
+ writer.OnRaw (skiffType, ::NYson::EYsonType::Node);
1504
+
1505
+ writer.OnKeyedItem (" Columns" );
1506
+ writer.OnBeginList ();
1507
+ for (auto & column: columns) {
1508
+ writer.OnListItem ();
1509
+ writer.OnStringScalar (column);
1510
+ }
1511
+ writer.OnEndList ();
1512
+ }
1513
+
1411
1514
if (type) {
1412
1515
writer.OnKeyedItem (" Type" );
1413
1516
writer.OnRaw (type);
@@ -1447,21 +1550,35 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
1447
1550
// TODO:
1448
1551
ui64 bytes = 0 ;
1449
1552
ui64 rows = 0 ;
1450
- writer.OnBeginList ();
1451
- for (auto & node : item.AsList ()) {
1452
- raw = NYT::NodeToYsonString (node);
1453
- bytes += raw.size ();
1454
- rows += 1 ;
1455
- writer.OnListItem ();
1456
- writer.OnRaw (raw);
1457
- if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit ) {
1553
+ switch (fillSettings.Format ) {
1554
+ case IDataProvider::EResultFormat::Yson: {
1555
+ writer.OnBeginList ();
1556
+
1557
+ for (auto & node : item.AsList ()) {
1558
+ raw = NYT::NodeToYsonString (node);
1559
+ bytes += raw.size ();
1560
+ rows += 1 ;
1561
+ writer.OnListItem ();
1562
+ writer.OnRaw (raw);
1563
+
1564
+ if (fillSettings.AllResultsBytesLimit && bytes >= *fillSettings.AllResultsBytesLimit ) {
1565
+ break ;
1566
+ }
1567
+ if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite ) {
1568
+ break ;
1569
+ }
1570
+ }
1571
+ writer.OnEndList ();
1458
1572
break ;
1459
1573
}
1460
- if (fillSettings.RowsLimitPerWrite && rows >= *fillSettings.RowsLimitPerWrite ) {
1574
+ case IDataProvider::EResultFormat::Skiff: {
1575
+ writer.OnStringScalar (skiffConverter->ConvertNodeToSkiff (state, fillSettings, rowSpec, item));
1461
1576
break ;
1462
1577
}
1578
+ default : {
1579
+ YQL_LOG_CTX_THROW yexception () << " Invalid result type: " << fillSettings.Format ;
1580
+ }
1463
1581
}
1464
- writer.OnEndList ();
1465
1582
1466
1583
if (enableFullResultWrite) {
1467
1584
writer.OnKeyedItem (" Ref" );
@@ -1476,7 +1593,19 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
1476
1593
writer.OnKeyedItem (" Truncated" );
1477
1594
writer.OnBooleanScalar (true );
1478
1595
} else {
1479
- writer.OnRaw (raw);
1596
+ switch (fillSettings.Format ) {
1597
+ case IDataProvider::EResultFormat::Yson: {
1598
+ writer.OnRaw (raw);
1599
+ break ;
1600
+ }
1601
+ case IDataProvider::EResultFormat::Skiff: {
1602
+ writer.OnStringScalar (skiffConverter->ConvertNodeToSkiff (state, fillSettings, rowSpec, item));
1603
+ break ;
1604
+ }
1605
+ default : {
1606
+ YQL_LOG_CTX_THROW yexception () << " Invalid result type: " << fillSettings.Format ;
1607
+ }
1608
+ }
1480
1609
}
1481
1610
1482
1611
if (rowsCount) {
@@ -1923,6 +2052,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
1923
2052
private:
1924
2053
TDqStatePtr State;
1925
2054
TExecStatePtr ExecState;
2055
+ ISkiffConverterPtr SkiffConverter;
1926
2056
mutable THashMap<TString, TFileLinkPtr> FileLinks;
1927
2057
mutable THashMap<TString, TString> ModulesMapping;
1928
2058
@@ -1932,7 +2062,11 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
1932
2062
}
1933
2063
1934
2064
IGraphTransformer* CreateDqExecTransformer (const TDqStatePtr& state) {
1935
- return new TDqExecTransformer (state);
2065
+ return new TDqExecTransformer (state, MakeIntrusive<TSimpleSkiffConverter>());
2066
+ }
2067
+
2068
+ TExecTransformerFactory CreateDqExecTransformerFactory (const ISkiffConverterPtr& skiffConverter) {
2069
+ return [skiffConverter] (const TDqStatePtr& state) { return new TDqExecTransformer (state, skiffConverter); };
1936
2070
}
1937
2071
1938
2072
} // namespace NYql
0 commit comments