@@ -21,6 +21,16 @@ using namespace NYql::NDq;
21
21
using namespace NYql ::NNodes;
22
22
23
23
24
+ TString KeyTypesToString (const TVector<NScheme::TTypeInfo>& keyColumnTypes) {
25
+ TVector<TString> stringNames;
26
+ stringNames.reserve (keyColumnTypes.size ());
27
+ for (const auto & keyColumnType: keyColumnTypes) {
28
+ stringNames.push_back (NYql::NProto::TypeIds_Name (keyColumnType.GetTypeId ()));
29
+ }
30
+ return " [" + JoinSeq (" ," , stringNames) + " ]" ;
31
+ };
32
+
33
+
24
34
void LogStage (const NActors::TActorContext& ctx, const TStageInfo& stageInfo) {
25
35
LOG_DEBUG_S (ctx, NKikimrServices::KQP_EXECUTER, stageInfo.DebugString ());
26
36
}
@@ -473,17 +483,15 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
473
483
<< (spilling ? " with spilling" : " without spilling" ));
474
484
};
475
485
476
-
477
486
bool hasMap = false ;
478
- bool isFusedStage = (stageInfo.Meta .TaskIdByHash != nullptr );
487
+ auto & columnShardHashV1Params = stageInfo.Meta .ColumnShardHashV1Params ;
488
+ bool isFusedStage = (columnShardHashV1Params.TaskIdByHash != nullptr );
479
489
if (enableShuffleElimination && !isFusedStage) { // taskIdHash can be already set if it is a fused stage, so hashpartition will derive columnv1 parameters from there
480
490
for (ui32 inputIndex = 0 ; inputIndex < stage.InputsSize (); ++inputIndex) {
481
491
const auto & input = stage.GetInputs (inputIndex);
482
- auto & originStageInfo = tasksGraph.GetStageInfo (NYql::NDq::TStageId (stageInfo.Id .TxId , input.GetStageIndex ()));
483
- stageInfo.Meta .TaskIdByHash = originStageInfo.Meta .TaskIdByHash ;
484
- stageInfo.Meta .SourceShardCount = originStageInfo.Meta .SourceShardCount ;
485
- stageInfo.Meta .SourceTableKeyColumnTypes = originStageInfo.Meta .SourceTableKeyColumnTypes ;
486
- if (input.GetTypeCase () == NKqpProto::TKqpPhyConnection::kMap ) {
492
+ auto & originStageInfo = tasksGraph.GetStageInfo (NYql::NDq::TStageId (stageInfo.Id .TxId , input.GetStageIndex ()));;
493
+ columnShardHashV1Params = originStageInfo.Meta .ColumnShardHashV1Params ;
494
+ if (input.GetTypeCase () == NKqpProto::TKqpPhyConnection::kMap ) {
487
495
// We want to enforce sourceShardCount from map connection, cause it can be at most one map connection
488
496
// and ColumnShardHash in Shuffle will use this parameter to shuffle on this map (same with taskIdByHash mapping)
489
497
hasMap = true ;
@@ -494,10 +502,10 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
494
502
495
503
// if it is stage, where we don't inherit parallelism.
496
504
if (enableShuffleElimination && !hasMap && !isFusedStage && stageInfo.Tasks .size () > 0 && stage.InputsSize () > 0 ) {
497
- stageInfo. Meta .SourceShardCount = stageInfo.Tasks .size ();
498
- stageInfo. Meta . TaskIdByHash = std::make_shared<TVector<ui64>>(stageInfo. Meta .SourceShardCount );
499
- for (std::size_t i = 0 ; i < stageInfo. Meta .SourceShardCount ; ++i) {
500
- (*stageInfo. Meta .TaskIdByHash )[i] = i;
505
+ columnShardHashV1Params .SourceShardCount = stageInfo.Tasks .size ();
506
+ columnShardHashV1Params. TaskIdByHash = std::make_shared<TVector<ui64>>(columnShardHashV1Params .SourceShardCount );
507
+ for (std::size_t i = 0 ; i < columnShardHashV1Params .SourceShardCount ; ++i) {
508
+ (*columnShardHashV1Params .TaskIdByHash )[i] = i;
501
509
}
502
510
503
511
for (auto & input : stage.GetInputs ()) {
@@ -510,17 +518,17 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
510
518
continue ;
511
519
}
512
520
513
- Y_ENSURE (enableShuffleElimination, " OptShuffleElimination wasn't turned on, but ColumnShardHashV1 detected!" );
514
- // ^ if the flag if false, and kColumnShardHashV1 detected - then the data which would be returned - would be incorrect,
521
+ Y_ENSURE (enableShuffleElimination, " OptShuffleElimination wasn't turned on, but ColumnShardHashV1 detected!" );
522
+ // ^ if the flag if false, and kColumnShardHashV1 detected - then the data which would be returned - would be incorrect,
515
523
// because we didn't save partitioning in the BuildScanTasksFromShards.
516
524
517
525
auto columnShardHashV1 = hashShuffle.GetColumnShardHashV1 ();
518
- stageInfo. Meta .SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
519
- stageInfo. Meta .SourceTableKeyColumnTypes ->reserve (columnShardHashV1.KeyColumnTypesSize ());
526
+ columnShardHashV1Params .SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
527
+ columnShardHashV1Params .SourceTableKeyColumnTypes ->reserve (columnShardHashV1.KeyColumnTypesSize ());
520
528
for (const auto & keyColumnType: columnShardHashV1.GetKeyColumnTypes ()) {
521
529
auto typeId = static_cast <NScheme::TTypeId>(keyColumnType);
522
530
auto typeInfo = NScheme::TTypeInfo{typeId};
523
- stageInfo. Meta .SourceTableKeyColumnTypes ->push_back (typeInfo);
531
+ columnShardHashV1Params .SourceTableKeyColumnTypes ->push_back (typeInfo);
524
532
}
525
533
break ;
526
534
}
@@ -544,18 +552,49 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
544
552
}
545
553
case NKqpProto::TKqpPhyCnHashShuffle::kColumnShardHashV1 : {
546
554
Y_ENSURE (enableShuffleElimination, " OptShuffleElimination wasn't turned on, but ColumnShardHashV1 detected!" );
547
- inputStageInfo.Meta .TaskIdByHash = stageInfo.Meta .TaskIdByHash ;
548
- inputStageInfo.Meta .SourceShardCount = stageInfo.Meta .SourceShardCount ;
549
- inputStageInfo.Meta .SourceTableKeyColumnTypes = stageInfo.Meta .SourceTableKeyColumnTypes ;
555
+
556
+ LOG_DEBUG_S (
557
+ *TlsActivationContext,
558
+ NKikimrServices::KQP_EXECUTER,
559
+ " Propogating columnhashv1 pararms to stage: "
560
+ << " [" << inputStageInfo.Id .TxId << " :" << inputStageInfo.Id .StageId << " ]" << " "
561
+ << KeyTypesToString (*columnShardHashV1Params.SourceTableKeyColumnTypes ) << " "
562
+ << " [" << JoinSeq (" ," , input.GetHashShuffle ().GetKeyColumns ()) << " ]" ;
563
+ );
564
+
565
+ Y_ENSURE (
566
+ columnShardHashV1Params.SourceTableKeyColumnTypes ->size () == input.GetHashShuffle ().KeyColumnsSize (),
567
+ TStringBuilder{}
568
+ << " Hashshuffle keycolumns and keytypes args count mismatch during executer stage, types: "
569
+ << KeyTypesToString (*columnShardHashV1Params.SourceTableKeyColumnTypes ) << " for the columns: "
570
+ << " [" << JoinSeq (" ," , input.GetHashShuffle ().GetKeyColumns ()) << " ]"
571
+ );
572
+
573
+ for (auto & originTaskId : inputStageInfo.Tasks ) {
574
+ auto & originTask = tasksGraph.GetTask (originTaskId);
575
+ auto & taskOutput = originTask.Outputs [outputIdx];
576
+ taskOutput.Meta .ColumnShardHashV1Params = columnShardHashV1Params;
577
+ }
578
+
579
+ inputStageInfo.Meta .ColumnShardHashV1Params = columnShardHashV1Params;
550
580
hashKind = NHashKind::EColumnShardHashV1;
551
581
break ;
552
582
}
553
583
default : {
554
584
Y_ENSURE (false , " undefined type of hash for shuffle" );
555
585
}
556
586
}
557
- BuildHashShuffleChannels (tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx,
558
- input.GetHashShuffle ().GetKeyColumns (), enableSpilling, log , hashKind);
587
+ BuildHashShuffleChannels (
588
+ tasksGraph,
589
+ stageInfo,
590
+ inputIdx,
591
+ inputStageInfo,
592
+ outputIdx,
593
+ input.GetHashShuffle ().GetKeyColumns (),
594
+ enableSpilling,
595
+ log ,
596
+ hashKind
597
+ );
559
598
break ;
560
599
}
561
600
case NKqpProto::TKqpPhyConnection::kBroadcast :
@@ -1036,7 +1075,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
1036
1075
private:
1037
1076
const TTableConstInfo& TableInfo;
1038
1077
public:
1039
- TResolverTable (const TTableConstInfo& tableInfo)
1078
+ TResolverTable (const TTableConstInfo& tableInfo)
1040
1079
: TableInfo(tableInfo) {
1041
1080
1042
1081
}
@@ -1118,9 +1157,9 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
1118
1157
}
1119
1158
1120
1159
void FillOutputDesc (
1121
- const TKqpTasksGraph& tasksGraph,
1122
- NYql::NDqProto::TTaskOutput& outputDesc,
1123
- const TTaskOutput& output,
1160
+ const TKqpTasksGraph& tasksGraph,
1161
+ NYql::NDqProto::TTaskOutput& outputDesc,
1162
+ const TTaskOutput& output,
1124
1163
bool enableSpilling,
1125
1164
const TStageInfo& stageInfo
1126
1165
) {
@@ -1143,19 +1182,37 @@ void FillOutputDesc(
1143
1182
break ;
1144
1183
}
1145
1184
case NHashKind::EColumnShardHashV1: {
1146
- Y_ENSURE (stageInfo.Meta .SourceShardCount != 0 , " ShardCount for ColumnShardHashV1 Shuffle can't be equal to 0" );
1147
- Y_ENSURE (stageInfo.Meta .TaskIdByHash != nullptr , " TaskIdByHash for ColumnShardHashV1 wasn't propogated to this stage" );
1148
- Y_ENSURE (stageInfo.Meta .SourceTableKeyColumnTypes != nullptr , " SourceTableKeyColumnTypes for ColumnShardHashV1 wasn't propogated to this stage" );
1185
+ auto & columnShardHashV1Params = output.Meta .ColumnShardHashV1Params ;
1186
+ LOG_DEBUG_S (
1187
+ *TlsActivationContext,
1188
+ NKikimrServices::KQP_EXECUTER,
1189
+ " Filling columnshardhashv1 params for sending it to runtime: "
1190
+ << " [" << stageInfo.Id .TxId << " :" << stageInfo.Id .StageId << " ]"
1191
+ << " " << KeyTypesToString (*columnShardHashV1Params.SourceTableKeyColumnTypes )
1192
+ << " for the columns: " << " [" << JoinSeq (" ," , output.KeyColumns ) << " ]"
1193
+ );
1194
+ Y_ENSURE (columnShardHashV1Params.SourceShardCount != 0 , " ShardCount for ColumnShardHashV1 Shuffle can't be equal to 0" );
1195
+ Y_ENSURE (columnShardHashV1Params.TaskIdByHash != nullptr , " TaskIdByHash for ColumnShardHashV1 wasn't propogated to this stage" );
1196
+ Y_ENSURE (columnShardHashV1Params.SourceTableKeyColumnTypes != nullptr , " SourceTableKeyColumnTypes for ColumnShardHashV1 wasn't propogated to this stage" );
1197
+
1198
+ Y_ENSURE (
1199
+ columnShardHashV1Params.SourceTableKeyColumnTypes ->size () == output.KeyColumns .size (),
1200
+ TStringBuilder{}
1201
+ << " Hashshuffle keycolumns and keytypes args count mismatch during executer stage, types: "
1202
+ << KeyTypesToString (*columnShardHashV1Params.SourceTableKeyColumnTypes ) << " for the columns: "
1203
+ << " [" << JoinSeq (" ," , output.KeyColumns ) << " ]"
1204
+ );
1205
+
1149
1206
auto & columnShardHashV1 = *hashPartitionDesc.MutableColumnShardHashV1 ();
1150
- columnShardHashV1.SetShardCount (stageInfo. Meta .SourceShardCount );
1207
+ columnShardHashV1.SetShardCount (columnShardHashV1Params .SourceShardCount );
1151
1208
1152
1209
auto * columnTypes = columnShardHashV1.MutableKeyColumnTypes ();
1153
- for (const auto & type: *stageInfo. Meta .SourceTableKeyColumnTypes ) {
1210
+ for (const auto & type: *columnShardHashV1Params .SourceTableKeyColumnTypes ) {
1154
1211
columnTypes->Add (type.GetTypeId ());
1155
1212
}
1156
1213
1157
1214
auto * taskIdByHash = columnShardHashV1.MutableTaskIdByHash ();
1158
- for (std::size_t taskID: *stageInfo. Meta .TaskIdByHash ) {
1215
+ for (std::size_t taskID: *columnShardHashV1Params .TaskIdByHash ) {
1159
1216
taskIdByHash->Add (taskID);
1160
1217
}
1161
1218
break ;
0 commit comments