@@ -6282,58 +6282,38 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
6282
6282
return node;
6283
6283
};
6284
6284
6285
- map[" ShuffleByKeys" ] = map[" PartitionsByKeys" ] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
6285
+ map[" PartitionsByKeys" ] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
6286
+ if (IsEmpty (node->Head (), *optCtx.Types )) {
6287
+ YQL_CLOG (DEBUG, Core) << node->Content () << " over empty input." ;
6288
+
6289
+ TExprNode::TPtr sequence = KeepConstraints (node->HeadPtr (), node->Tail ().Head ().Head (), ctx);
6290
+ auto lambdaResult = ctx.Builder (node->Pos ()).Apply (node->Tail ()).With (0 , sequence).Seal ().Build ();
6291
+ return lambdaResult;
6292
+ }
6293
+ return node;
6294
+ };
6295
+
6296
+ map[" ShuffleByKeys" ] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
6286
6297
if (IsEmpty (node->Head (), *optCtx.Types )) {
6287
6298
YQL_CLOG (DEBUG, Core) << node->Content () << " over empty input." ;
6288
6299
6289
- // lambda argument type:
6290
6300
auto & lambdaArg = node->Tail ().Head ().Head ();
6291
- auto * lambdaArgType = lambdaArg.GetTypeAnn ();
6292
- YQL_ENSURE (lambdaArgType, " No argument type for lambda in " << node->Content ());
6293
6301
6294
- // sequence type:
6295
- auto * sequenceType = node-> Head (). GetTypeAnn ();
6302
+ TExprNode::TPtr sequence = node-> HeadPtr ();
6303
+ auto * sequenceType = sequence-> GetTypeAnn ();
6296
6304
YQL_ENSURE (sequenceType, " No argument type for sequence in " << node->Content ());
6297
6305
6298
- TStringBuf typeConversionFunc;
6299
- if (sequenceType->GetKind () != lambdaArgType->GetKind ()) {
6300
- switch (lambdaArgType->GetKind ()) {
6301
- case ETypeAnnotationKind::List:
6302
- if (sequenceType->GetKind () == ETypeAnnotationKind::Optional) {
6303
- typeConversionFunc = " ToList" ;
6304
- } else if (IsIn ({ ETypeAnnotationKind::Stream, ETypeAnnotationKind::Flow }, sequenceType->GetKind ())) {
6305
- typeConversionFunc = " ForwardList" ;
6306
- }
6307
- break ;
6308
- case ETypeAnnotationKind::Flow:
6309
- typeConversionFunc = " ToFlow" ;
6310
- break ;
6311
- case ETypeAnnotationKind::Stream:
6312
- typeConversionFunc = " ToStream" ;
6313
- break ;
6314
- case ETypeAnnotationKind::Optional:
6315
- if (sequenceType->GetKind () == ETypeAnnotationKind::List) {
6316
- typeConversionFunc = " ToOptional" ;
6317
- }
6318
- // TODO: convert from Stream/Flow to Optional
6319
- break ;
6320
- default :
6321
- break ;
6322
- }
6306
+ if (sequenceType->GetKind () != ETypeAnnotationKind::Stream) {
6307
+ sequence = ctx.NewCallable (sequence->Pos (), " ToStream" , { sequence });
6323
6308
}
6309
+ sequence = KeepConstraints (sequence, lambdaArg, ctx);
6324
6310
6325
- TExprNode::TPtr sequence = KeepConstraints (node->HeadPtr (), lambdaArg, ctx);
6326
- if (typeConversionFunc) {
6327
- sequence = KeepConstraints (ctx.NewCallable (sequence->Pos (), typeConversionFunc, { sequence }), lambdaArg, ctx);
6328
- }
6329
6311
auto lambdaResult = ctx.Builder (node->Pos ()).Apply (node->Tail ()).With (0 , sequence).Seal ().Build ();
6330
- if (node->IsCallable (" ShuffleByKeys" )) {
6331
- auto lambdaType = node->Tail ().GetTypeAnn ();
6332
- if (lambdaType->GetKind () == ETypeAnnotationKind::Optional) {
6333
- lambdaResult = ctx.NewCallable (lambdaResult->Pos (), " ToList" , { lambdaResult });
6334
- } else if (lambdaType->GetKind () == ETypeAnnotationKind::Stream) {
6335
- lambdaResult = ctx.NewCallable (lambdaResult->Pos (), " ForwardList" , { lambdaResult });
6336
- }
6312
+ auto lambdaType = node->Tail ().GetTypeAnn ();
6313
+ if (lambdaType->GetKind () == ETypeAnnotationKind::Optional) {
6314
+ lambdaResult = ctx.NewCallable (lambdaResult->Pos (), " ToList" , { lambdaResult });
6315
+ } else if (lambdaType->GetKind () == ETypeAnnotationKind::Stream || lambdaType->GetKind () == ETypeAnnotationKind::Flow) {
6316
+ lambdaResult = ctx.NewCallable (lambdaResult->Pos (), " ForwardList" , { lambdaResult });
6337
6317
}
6338
6318
return lambdaResult;
6339
6319
}
0 commit comments