Skip to content

Commit a39a424

Browse files
authored
Merge 021dbb0 into 4c3f422
2 parents 4c3f422 + 021dbb0 commit a39a424

17 files changed

+729
-392
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,7 +1403,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
14031403
partitionsCount = originStageInfo.Tasks.size();
14041404
UnknownAffectedShardCount = true;
14051405
break;
1406-
case NKqpProto::TKqpPhyConnection::kMap:
1406+
case NKqpProto::TKqpPhyConnection::kMap:
14071407
partitionsCount = originStageInfo.Tasks.size();
14081408
forceMapTasks = true;
14091409
++mapCnt;
@@ -1631,13 +1631,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
16311631
THashMap<ui64, ui64> assignedShardsCount;
16321632
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
16331633

1634+
auto& columnShardHashV1Params = stageInfo.Meta.ColumnShardHashV1Params;
16341635
if (enableShuffleElimination && stageInfo.Meta.ColumnTableInfoPtr) {
16351636
const auto& tableDesc = stageInfo.Meta.ColumnTableInfoPtr->Description;
1636-
stageInfo.Meta.SourceShardCount = tableDesc.GetColumnShardCount();
1637-
stageInfo.Meta.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
1637+
columnShardHashV1Params.SourceShardCount = tableDesc.GetColumnShardCount();
1638+
columnShardHashV1Params.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
16381639
for (const auto& column: tableDesc.GetSharding().GetHashSharding().GetColumns()) {
16391640
auto columnType = stageInfo.Meta.TableConstInfo->Columns.at(column).Type;
1640-
stageInfo.Meta.SourceTableKeyColumnTypes->push_back(columnType);
1641+
columnShardHashV1Params.SourceTableKeyColumnTypes->push_back(columnType);
16411642
}
16421643
}
16431644

@@ -1697,8 +1698,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
16971698

16981699
} else if (enableShuffleElimination /* save partitioning for shuffle elimination */) {
16991700
std::size_t stageInternalTaskId = 0;
1700-
stageInfo.Meta.TaskIdByHash = std::make_shared<TVector<ui64>>();
1701-
stageInfo.Meta.TaskIdByHash->resize(stageInfo.Meta.SourceShardCount);
1701+
columnShardHashV1Params.TaskIdByHash = std::make_shared<TVector<ui64>>();
1702+
columnShardHashV1Params.TaskIdByHash->resize(columnShardHashV1Params.SourceShardCount);
17021703

17031704
for (auto&& pair : nodeShards) {
17041705
const auto nodeId = pair.first;
@@ -1709,11 +1710,11 @@ class TKqpExecuterBase : public TActor<TDerived> {
17091710
for (std::size_t i = 0; i < shardsInfo.size(); ++i) {
17101711
auto&& shardInfo = shardsInfo[i];
17111712
MergeReadInfoToTaskMeta(
1712-
metas[i % maxTasksPerNode],
1713-
shardInfo.ShardId,
1714-
shardInfo.KeyReadRanges,
1713+
metas[i % maxTasksPerNode],
1714+
shardInfo.ShardId,
1715+
shardInfo.KeyReadRanges,
17151716
readSettings,
1716-
columns, op,
1717+
columns, op,
17171718
/*isPersistentScan*/ true
17181719
);
17191720
}
@@ -1727,13 +1728,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
17271728

17281729
// in runtime we calc hash, which will be in [0; shardcount]
17291730
// so we merge to mappings : hash -> shardID and shardID -> channelID for runtime
1730-
THashMap<ui64, ui64> hashByShardId;
1731+
THashMap<ui64, ui64> hashByShardId;
1732+
Y_ENSURE(stageInfo.Meta.ColumnTableInfoPtr != nullptr, "ColumnTableInfoPtr is nullptr, maybe information about shards haven't beed delivered yet.");
17311733
const auto& tableDesc = stageInfo.Meta.ColumnTableInfoPtr->Description;
17321734
const auto& sharding = tableDesc.GetSharding();
17331735
for (std::size_t i = 0; i < sharding.ColumnShardsSize(); ++i) {
17341736
hashByShardId.insert({sharding.GetColumnShards(i), i});
17351737
}
1736-
1738+
17371739
for (ui32 t = 0; t < maxTasksPerNode; ++t, ++stageInternalTaskId) {
17381740
auto& task = TasksGraph.AddTask(stageInfo);
17391741
task.Meta = metas[t];
@@ -1745,14 +1747,21 @@ class TKqpExecuterBase : public TActor<TDerived> {
17451747
task.SetMetaId(t);
17461748
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
17471749
BuildSinks(stage, task);
1748-
1750+
17491751
for (const auto& readInfo: *task.Meta.Reads) {
17501752
Y_ENSURE(hashByShardId.contains(readInfo.ShardId));
1751-
(*stageInfo.Meta.TaskIdByHash)[hashByShardId[readInfo.ShardId]] = stageInternalTaskId;
1753+
(*columnShardHashV1Params.TaskIdByHash)[hashByShardId[readInfo.ShardId]] = stageInternalTaskId;
17521754
}
17531755

17541756
}
17551757
}
1758+
1759+
LOG_DEBUG_S(
1760+
*TlsActivationContext,
1761+
NKikimrServices::KQP_EXECUTER,
1762+
"Stage with scan " << "[" << stageInfo.Id.TxId << ":" << stageInfo.Id.StageId << "]" << " has keys: "
1763+
<< columnShardHashV1Params.KeyTypesToString();
1764+
);
17561765
} else {
17571766
ui32 metaId = 0;
17581767
for (auto&& pair : nodeShards) {

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 90 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -489,17 +489,27 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
489489
<< (spilling ? " with spilling" : " without spilling"));
490490
};
491491

492-
493492
bool hasMap = false;
494-
bool isFusedStage = (stageInfo.Meta.TaskIdByHash != nullptr);
495-
if (enableShuffleElimination && !isFusedStage) { // taskIdHash can be already set if it is a fused stage, so hashpartition will derive columnv1 parameters from there
493+
auto& columnShardHashV1Params = stageInfo.Meta.ColumnShardHashV1Params;
494+
bool isFusedWithScanStage = (stageInfo.Meta.TableConstInfo != nullptr);
495+
if (enableShuffleElimination && !isFusedWithScanStage) { // taskIdHash can be already set if it is a fused stage, so hashpartition will derive columnv1 parameters from there
496496
for (ui32 inputIndex = 0; inputIndex < stage.InputsSize(); ++inputIndex) {
497497
const auto& input = stage.GetInputs(inputIndex);
498498
auto& originStageInfo = tasksGraph.GetStageInfo(NYql::NDq::TStageId(stageInfo.Id.TxId, input.GetStageIndex()));
499-
stageInfo.Meta.TaskIdByHash = originStageInfo.Meta.TaskIdByHash;
500-
stageInfo.Meta.SourceShardCount = originStageInfo.Meta.SourceShardCount;
501-
stageInfo.Meta.SourceTableKeyColumnTypes = originStageInfo.Meta.SourceTableKeyColumnTypes;
502-
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kMap) {
499+
ui32 outputIdx = input.GetOutputIndex();
500+
columnShardHashV1Params = originStageInfo.Meta.GetColumnShardHashV1Params(outputIdx);
501+
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kMap || inputIndex == stage.InputsSize() - 1) { // this branch is only for logging purposes
502+
LOG_DEBUG_S(
503+
*TlsActivationContext,
504+
NKikimrServices::KQP_EXECUTER,
505+
"Chosed "
506+
<< "[" << originStageInfo.Id.TxId << ":" << originStageInfo.Id.StageId << "]"
507+
<< " outputIdx: " << outputIdx << " to propogate through inputs stages of the stage "
508+
<< "[" << stageInfo.Id.TxId << ":" << stageInfo.Id.StageId << "]" << ": "
509+
<< columnShardHashV1Params.KeyTypesToString();
510+
);
511+
}
512+
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kMap) {
503513
// We want to enforce sourceShardCount from map connection, cause it can be at most one map connection
504514
// and ColumnShardHash in Shuffle will use this parameter to shuffle on this map (same with taskIdByHash mapping)
505515
hasMap = true;
@@ -509,11 +519,11 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
509519
}
510520

511521
// if it is stage, where we don't inherit parallelism.
512-
if (enableShuffleElimination && !hasMap && !isFusedStage && stageInfo.Tasks.size() > 0 && stage.InputsSize() > 0) {
513-
stageInfo.Meta.SourceShardCount = stageInfo.Tasks.size();
514-
stageInfo.Meta.TaskIdByHash = std::make_shared<TVector<ui64>>(stageInfo.Meta.SourceShardCount);
515-
for (std::size_t i = 0; i < stageInfo.Meta.SourceShardCount; ++i) {
516-
(*stageInfo.Meta.TaskIdByHash)[i] = i;
522+
if (enableShuffleElimination && !hasMap && !isFusedWithScanStage && stageInfo.Tasks.size() > 0 && stage.InputsSize() > 0) {
523+
columnShardHashV1Params.SourceShardCount = stageInfo.Tasks.size();
524+
columnShardHashV1Params.TaskIdByHash = std::make_shared<TVector<ui64>>(columnShardHashV1Params.SourceShardCount);
525+
for (std::size_t i = 0; i < columnShardHashV1Params.SourceShardCount; ++i) {
526+
(*columnShardHashV1Params.TaskIdByHash)[i] = i;
517527
}
518528

519529
for (auto& input : stage.GetInputs()) {
@@ -526,17 +536,17 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
526536
continue;
527537
}
528538

529-
Y_ENSURE(enableShuffleElimination, "OptShuffleElimination wasn't turned on, but ColumnShardHashV1 detected!");
530-
// ^ if the flag if false, and kColumnShardHashV1 detected - then the data which would be returned - would be incorrect,
539+
Y_ENSURE(enableShuffleElimination, "OptShuffleElimination wasn't turned on, but ColumnShardHashV1 detected!");
540+
// ^ if the flag if false, and kColumnShardHashV1 detected - then the data which would be returned - would be incorrect,
531541
// because we didn't save partitioning in the BuildScanTasksFromShards.
532542

533543
auto columnShardHashV1 = hashShuffle.GetColumnShardHashV1();
534-
stageInfo.Meta.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
535-
stageInfo.Meta.SourceTableKeyColumnTypes->reserve(columnShardHashV1.KeyColumnTypesSize());
544+
columnShardHashV1Params.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
545+
columnShardHashV1Params.SourceTableKeyColumnTypes->reserve(columnShardHashV1.KeyColumnTypesSize());
536546
for (const auto& keyColumnType: columnShardHashV1.GetKeyColumnTypes()) {
537547
auto typeId = static_cast<NScheme::TTypeId>(keyColumnType);
538548
auto typeInfo = NScheme::TTypeInfo{typeId};
539-
stageInfo.Meta.SourceTableKeyColumnTypes->push_back(typeInfo);
549+
columnShardHashV1Params.SourceTableKeyColumnTypes->push_back(typeInfo);
540550
}
541551
break;
542552
}
@@ -560,18 +570,44 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
560570
}
561571
case NKqpProto::TKqpPhyCnHashShuffle::kColumnShardHashV1: {
562572
Y_ENSURE(enableShuffleElimination, "OptShuffleElimination wasn't turned on, but ColumnShardHashV1 detected!");
563-
inputStageInfo.Meta.TaskIdByHash = stageInfo.Meta.TaskIdByHash;
564-
inputStageInfo.Meta.SourceShardCount = stageInfo.Meta.SourceShardCount;
565-
inputStageInfo.Meta.SourceTableKeyColumnTypes = stageInfo.Meta.SourceTableKeyColumnTypes;
573+
574+
LOG_DEBUG_S(
575+
*TlsActivationContext,
576+
NKikimrServices::KQP_EXECUTER,
577+
"Propogating columnhashv1 pararms to stage"
578+
<< "[" << inputStageInfo.Id.TxId << ":" << inputStageInfo.Id.StageId << "]" << " which is input of stage "
579+
<< "[" << stageInfo.Id.TxId << ":" << stageInfo.Id.StageId << "]" << ": "
580+
<< columnShardHashV1Params.KeyTypesToString() << " "
581+
<< "[" << JoinSeq(",", input.GetHashShuffle().GetKeyColumns()) << "]";
582+
);
583+
584+
Y_ENSURE(
585+
columnShardHashV1Params.SourceTableKeyColumnTypes->size() == input.GetHashShuffle().KeyColumnsSize(),
586+
TStringBuilder{}
587+
<< "Hashshuffle keycolumns and keytypes args count mismatch during executer stage, types: "
588+
<< columnShardHashV1Params.KeyTypesToString() << " for the columns: "
589+
<< "[" << JoinSeq(",", input.GetHashShuffle().GetKeyColumns()) << "]"
590+
);
591+
592+
inputStageInfo.Meta.HashParamsByOutput[outputIdx] = columnShardHashV1Params;
566593
hashKind = NHashKind::EColumnShardHashV1;
567594
break;
568595
}
569596
default: {
570597
Y_ENSURE(false, "undefined type of hash for shuffle");
571598
}
572599
}
573-
BuildHashShuffleChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx,
574-
input.GetHashShuffle().GetKeyColumns(), enableSpilling, log, hashKind);
600+
BuildHashShuffleChannels(
601+
tasksGraph,
602+
stageInfo,
603+
inputIdx,
604+
inputStageInfo,
605+
outputIdx,
606+
input.GetHashShuffle().GetKeyColumns(),
607+
enableSpilling,
608+
log,
609+
hashKind
610+
);
575611
break;
576612
}
577613
case NKqpProto::TKqpPhyConnection::kBroadcast:
@@ -1052,7 +1088,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
10521088
private:
10531089
const TTableConstInfo& TableInfo;
10541090
public:
1055-
TResolverTable(const TTableConstInfo& tableInfo)
1091+
TResolverTable(const TTableConstInfo& tableInfo)
10561092
: TableInfo(tableInfo) {
10571093

10581094
}
@@ -1134,9 +1170,10 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
11341170
}
11351171

11361172
void FillOutputDesc(
1137-
const TKqpTasksGraph& tasksGraph,
1138-
NYql::NDqProto::TTaskOutput& outputDesc,
1139-
const TTaskOutput& output,
1173+
const TKqpTasksGraph& tasksGraph,
1174+
NYql::NDqProto::TTaskOutput& outputDesc,
1175+
const TTaskOutput& output,
1176+
ui32 outputIdx,
11401177
bool enableSpilling,
11411178
const TStageInfo& stageInfo
11421179
) {
@@ -1159,19 +1196,37 @@ void FillOutputDesc(
11591196
break;
11601197
}
11611198
case NHashKind::EColumnShardHashV1: {
1162-
Y_ENSURE(stageInfo.Meta.SourceShardCount != 0, "ShardCount for ColumnShardHashV1 Shuffle can't be equal to 0");
1163-
Y_ENSURE(stageInfo.Meta.TaskIdByHash != nullptr, "TaskIdByHash for ColumnShardHashV1 wasn't propogated to this stage");
1164-
Y_ENSURE(stageInfo.Meta.SourceTableKeyColumnTypes != nullptr, "SourceTableKeyColumnTypes for ColumnShardHashV1 wasn't propogated to this stage");
1199+
auto& columnShardHashV1Params = stageInfo.Meta.GetColumnShardHashV1Params(outputIdx);
1200+
LOG_DEBUG_S(
1201+
*TlsActivationContext,
1202+
NKikimrServices::KQP_EXECUTER,
1203+
"Filling columnshardhashv1 params for sending it to runtime "
1204+
<< "[" << stageInfo.Id.TxId << ":" << stageInfo.Id.StageId << "]"
1205+
<< ": " << columnShardHashV1Params.KeyTypesToString()
1206+
<< " for the columns: " << "[" << JoinSeq(",", output.KeyColumns) << "]"
1207+
);
1208+
Y_ENSURE(columnShardHashV1Params.SourceShardCount != 0, "ShardCount for ColumnShardHashV1 Shuffle can't be equal to 0");
1209+
Y_ENSURE(columnShardHashV1Params.TaskIdByHash != nullptr, "TaskIdByHash for ColumnShardHashV1 wasn't propogated to this stage");
1210+
Y_ENSURE(columnShardHashV1Params.SourceTableKeyColumnTypes != nullptr, "SourceTableKeyColumnTypes for ColumnShardHashV1 wasn't propogated to this stage");
1211+
1212+
Y_ENSURE(
1213+
columnShardHashV1Params.SourceTableKeyColumnTypes->size() == output.KeyColumns.size(),
1214+
TStringBuilder{}
1215+
<< "Hashshuffle keycolumns and keytypes args count mismatch during executer FillOutputDesc stage, types: "
1216+
<< columnShardHashV1Params.KeyTypesToString() << " for the columns: "
1217+
<< "[" << JoinSeq(",", output.KeyColumns) << "]"
1218+
);
1219+
11651220
auto& columnShardHashV1 = *hashPartitionDesc.MutableColumnShardHashV1();
1166-
columnShardHashV1.SetShardCount(stageInfo.Meta.SourceShardCount);
1221+
columnShardHashV1.SetShardCount(columnShardHashV1Params.SourceShardCount);
11671222

11681223
auto* columnTypes = columnShardHashV1.MutableKeyColumnTypes();
1169-
for (const auto& type: *stageInfo.Meta.SourceTableKeyColumnTypes) {
1224+
for (const auto& type: *columnShardHashV1Params.SourceTableKeyColumnTypes) {
11701225
columnTypes->Add(type.GetTypeId());
11711226
}
11721227

11731228
auto* taskIdByHash = columnShardHashV1.MutableTaskIdByHash();
1174-
for (std::size_t taskID: *stageInfo.Meta.TaskIdByHash) {
1229+
for (std::size_t taskID: *columnShardHashV1Params.TaskIdByHash) {
11751230
taskIdByHash->Add(taskID);
11761231
}
11771232
break;
@@ -1346,8 +1401,9 @@ void SerializeTaskToProto(
13461401
if (task.Outputs.size() > 1) {
13471402
enableSpilling = tasksGraph.GetMeta().AllowWithSpilling;
13481403
}
1349-
for (const auto& output : task.Outputs) {
1350-
FillOutputDesc(tasksGraph, *result->AddOutputs(), output, enableSpilling, stageInfo);
1404+
for (ui32 outputIdx = 0; outputIdx < task.Outputs.size(); ++outputIdx) {
1405+
const auto& output = task.Outputs[outputIdx];
1406+
FillOutputDesc(tasksGraph, *result->AddOutputs(), output, outputIdx, enableSpilling, stageInfo);
13511407
}
13521408

13531409
const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);

0 commit comments

Comments
 (0)