Skip to content

Commit d4eee5c

Browse files
authored
Prohibit UDF on Flow in case of PartitionByKeys callable. (#6322)
1 parent 54a0a74 commit d4eee5c

6 files changed

+44
-1
lines changed

ydb/library/yql/core/yql_opt_utils.cpp

+7-1
Original file line numberDiff line numberDiff line change
@@ -1813,7 +1813,8 @@ TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, con
18131813
|| TCoForwardList::Match(node.Get())
18141814
|| TCoApply::Match(node.Get())
18151815
|| TCoSwitch::Match(node.Get())
1816-
|| node->IsCallable("DqReplicate");
1816+
|| node->IsCallable("DqReplicate")
1817+
|| TCoPartitionsByKeys::Match(node.Get());
18171818
}
18181819
);
18191820

@@ -1851,6 +1852,11 @@ TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, con
18511852
return node;
18521853
}
18531854
}
1855+
} else if (TCoPartitionsByKeys::Match(candidate.Get())) {
1856+
const auto handlerChild = candidate->Child(TCoPartitionsByKeys::idx_ListHandlerLambda);
1857+
if (auto node = FindNonYieldTransparentNodeImpl(handlerChild->TailPtr(), udfSupportsYield, TNodeSet{&handlerChild->Head().Head()})) {
1858+
return node;
1859+
}
18541860
}
18551861
}
18561862
return {};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
xfail
2+
in Input1 input1.txt
3+
udf python3_udf
4+
providers dq
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/* postgres can not */
2+
USE plato;
3+
4+
$udfScript = @@
5+
import functools
6+
def Len(stream):
7+
sums = [functools.reduce(lambda x,y: x + y, pair[1], 0) for pair in stream]
8+
return {"sumByAllVal":functools.reduce(lambda x,y: x + y, sums, 0)}
9+
@@;
10+
11+
$udf = Python3::Len(Callable<(Stream<Tuple<String,Stream<Uint32>>>)->Struct<sumByAllVal:Uint32>>, $udfScript);
12+
13+
--INSERT INTO Output
14+
REDUCE Input1 ON key USING ALL $udf(cast(value as uint32) ?? 0);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
in Input1 input1.txt
2+
udf python3_udf
3+
providers yt
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/* postgres can not */
2+
USE plato;
3+
4+
$udfScript = @@
5+
import functools
6+
def Len(key, input):
7+
return {"value":functools.reduce(lambda x,y: x + 1, input, 0)}
8+
@@;
9+
10+
$udf = Python::Len(Callable<(String, Stream<String>)->Struct<value:Uint32>>, $udfScript);
11+
12+
$res = (REDUCE Input1 ON key USING $udf(value));
13+
14+
select * from $res order by value;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
in Input1 input1.txt
2+
udf python3_udf

0 commit comments

Comments
 (0)