Skip to content

Commit ad6ecee

Browse files
authored
Merge 574fc14 into d35b323
2 parents d35b323 + 574fc14 commit ad6ecee

File tree

13 files changed

+567
-212
lines changed

13 files changed

+567
-212
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1622,13 +1622,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
16221622
THashMap<ui64, ui64> assignedShardsCount;
16231623
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
16241624

1625+
auto& columnShardHashV1Params = stageInfo.Meta.ColumnShardHashV1Params;
16251626
if (enableShuffleElimination && stageInfo.Meta.ColumnTableInfoPtr) {
16261627
const auto& tableDesc = stageInfo.Meta.ColumnTableInfoPtr->Description;
1627-
stageInfo.Meta.SourceShardCount = tableDesc.GetColumnShardCount();
1628-
stageInfo.Meta.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
1628+
columnShardHashV1Params.SourceShardCount = tableDesc.GetColumnShardCount();
1629+
columnShardHashV1Params.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
16291630
for (const auto& column: tableDesc.GetSharding().GetHashSharding().GetColumns()) {
16301631
auto columnType = stageInfo.Meta.TableConstInfo->Columns.at(column).Type;
1631-
stageInfo.Meta.SourceTableKeyColumnTypes->push_back(columnType);
1632+
columnShardHashV1Params.SourceTableKeyColumnTypes->push_back(columnType);
16321633
}
16331634
}
16341635

@@ -1688,8 +1689,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
16881689

16891690
} else if (enableShuffleElimination /* save partitioning for shuffle elimination */) {
16901691
std::size_t stageInternalTaskId = 0;
1691-
stageInfo.Meta.TaskIdByHash = std::make_shared<TVector<ui64>>();
1692-
stageInfo.Meta.TaskIdByHash->resize(stageInfo.Meta.SourceShardCount);
1692+
columnShardHashV1Params.TaskIdByHash = std::make_shared<TVector<ui64>>();
1693+
columnShardHashV1Params.TaskIdByHash->resize(columnShardHashV1Params.SourceShardCount);
16931694

16941695
for (auto&& pair : nodeShards) {
16951696
const auto nodeId = pair.first;
@@ -1739,7 +1740,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17391740

17401741
for (const auto& readInfo: *task.Meta.Reads) {
17411742
Y_ENSURE(hashByShardId.contains(readInfo.ShardId));
1742-
(*stageInfo.Meta.TaskIdByHash)[hashByShardId[readInfo.ShardId]] = stageInternalTaskId;
1743+
(*columnShardHashV1Params.TaskIdByHash)[hashByShardId[readInfo.ShardId]] = stageInternalTaskId;
17431744
}
17441745

17451746
}

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 88 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ using namespace NYql::NDq;
2121
using namespace NYql::NNodes;
2222

2323

24+
TString KeyTypesToString(const TVector<NScheme::TTypeInfo>& keyColumnTypes) {
25+
TVector<TString> stringNames;
26+
stringNames.reserve(keyColumnTypes.size());
27+
for (const auto& keyColumnType: keyColumnTypes) {
28+
stringNames.push_back(NYql::NProto::TypeIds_Name(keyColumnType.GetTypeId()));
29+
}
30+
return "[" + JoinSeq(",", stringNames) + "]";
31+
};
32+
33+
2434
void LogStage(const NActors::TActorContext& ctx, const TStageInfo& stageInfo) {
2535
LOG_DEBUG_S(ctx, NKikimrServices::KQP_EXECUTER, stageInfo.DebugString());
2636
}
@@ -473,17 +483,16 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
473483
<< (spilling ? " with spilling" : " without spilling"));
474484
};
475485

476-
477486
bool hasMap = false;
478-
bool isFusedStage = (stageInfo.Meta.TaskIdByHash != nullptr);
479-
if (enableShuffleElimination && !isFusedStage) { // taskIdHash can be already set if it is a fused stage, so hashpartition will derive columnv1 parameters from there
487+
auto& columnShardHashV1Params = stageInfo.Meta.ColumnShardHashV1Params;
488+
bool isFusedWithScanStage = (stageInfo.Meta.TableConstInfo != nullptr);
489+
if (enableShuffleElimination && !isFusedWithScanStage) { // taskIdHash can be already set if it is a fused stage, so hashpartition will derive columnv1 parameters from there
480490
for (ui32 inputIndex = 0; inputIndex < stage.InputsSize(); ++inputIndex) {
481491
const auto& input = stage.GetInputs(inputIndex);
482492
auto& originStageInfo = tasksGraph.GetStageInfo(NYql::NDq::TStageId(stageInfo.Id.TxId, input.GetStageIndex()));
483-
stageInfo.Meta.TaskIdByHash = originStageInfo.Meta.TaskIdByHash;
484-
stageInfo.Meta.SourceShardCount = originStageInfo.Meta.SourceShardCount;
485-
stageInfo.Meta.SourceTableKeyColumnTypes = originStageInfo.Meta.SourceTableKeyColumnTypes;
486-
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kMap) {
493+
ui32 outputIdx = input.GetOutputIndex();
494+
columnShardHashV1Params = originStageInfo.Meta.GetColumnShardHashV1Params(outputIdx);
495+
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kMap) {
487496
// We want to enforce sourceShardCount from map connection, cause it can be at most one map connection
488497
// and ColumnShardHash in Shuffle will use this parameter to shuffle on this map (same with taskIdByHash mapping)
489498
hasMap = true;
@@ -493,11 +502,11 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
493502
}
494503

495504
// if it is stage, where we don't inherit parallelism.
496-
if (enableShuffleElimination && !hasMap && !isFusedStage && stageInfo.Tasks.size() > 0 && stage.InputsSize() > 0) {
497-
stageInfo.Meta.SourceShardCount = stageInfo.Tasks.size();
498-
stageInfo.Meta.TaskIdByHash = std::make_shared<TVector<ui64>>(stageInfo.Meta.SourceShardCount);
499-
for (std::size_t i = 0; i < stageInfo.Meta.SourceShardCount; ++i) {
500-
(*stageInfo.Meta.TaskIdByHash)[i] = i;
505+
if (enableShuffleElimination && !hasMap && !isFusedWithScanStage && stageInfo.Tasks.size() > 0 && stage.InputsSize() > 0) {
506+
columnShardHashV1Params.SourceShardCount = stageInfo.Tasks.size();
507+
columnShardHashV1Params.TaskIdByHash = std::make_shared<TVector<ui64>>(columnShardHashV1Params.SourceShardCount);
508+
for (std::size_t i = 0; i < columnShardHashV1Params.SourceShardCount; ++i) {
509+
(*columnShardHashV1Params.TaskIdByHash)[i] = i;
501510
}
502511

503512
for (auto& input : stage.GetInputs()) {
@@ -510,17 +519,17 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
510519
continue;
511520
}
512521

513-
Y_ENSURE(enableShuffleElimination, "OptShuffleElimination wasn't turned on, but ColumnShardHashV1 detected!");
514-
// ^ if the flag if false, and kColumnShardHashV1 detected - then the data which would be returned - would be incorrect,
522+
Y_ENSURE(enableShuffleElimination, "OptShuffleElimination wasn't turned on, but ColumnShardHashV1 detected!");
523+
// ^ if the flag if false, and kColumnShardHashV1 detected - then the data which would be returned - would be incorrect,
515524
// because we didn't save partitioning in the BuildScanTasksFromShards.
516525

517526
auto columnShardHashV1 = hashShuffle.GetColumnShardHashV1();
518-
stageInfo.Meta.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
519-
stageInfo.Meta.SourceTableKeyColumnTypes->reserve(columnShardHashV1.KeyColumnTypesSize());
527+
columnShardHashV1Params.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
528+
columnShardHashV1Params.SourceTableKeyColumnTypes->reserve(columnShardHashV1.KeyColumnTypesSize());
520529
for (const auto& keyColumnType: columnShardHashV1.GetKeyColumnTypes()) {
521530
auto typeId = static_cast<NScheme::TTypeId>(keyColumnType);
522531
auto typeInfo = NScheme::TTypeInfo{typeId};
523-
stageInfo.Meta.SourceTableKeyColumnTypes->push_back(typeInfo);
532+
columnShardHashV1Params.SourceTableKeyColumnTypes->push_back(typeInfo);
524533
}
525534
break;
526535
}
@@ -544,18 +553,43 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
544553
}
545554
case NKqpProto::TKqpPhyCnHashShuffle::kColumnShardHashV1: {
546555
Y_ENSURE(enableShuffleElimination, "OptShuffleElimination wasn't turned on, but ColumnShardHashV1 detected!");
547-
inputStageInfo.Meta.TaskIdByHash = stageInfo.Meta.TaskIdByHash;
548-
inputStageInfo.Meta.SourceShardCount = stageInfo.Meta.SourceShardCount;
549-
inputStageInfo.Meta.SourceTableKeyColumnTypes = stageInfo.Meta.SourceTableKeyColumnTypes;
556+
557+
LOG_DEBUG_S(
558+
*TlsActivationContext,
559+
NKikimrServices::KQP_EXECUTER,
560+
"Propogating columnhashv1 pararms to stage"
561+
<< "[" << inputStageInfo.Id.TxId << ":" << inputStageInfo.Id.StageId << "]" << ": "
562+
<< KeyTypesToString(*columnShardHashV1Params.SourceTableKeyColumnTypes) << " "
563+
<< "[" << JoinSeq(",", input.GetHashShuffle().GetKeyColumns()) << "]";
564+
);
565+
566+
Y_ENSURE(
567+
columnShardHashV1Params.SourceTableKeyColumnTypes->size() == input.GetHashShuffle().KeyColumnsSize(),
568+
TStringBuilder{}
569+
<< "Hashshuffle keycolumns and keytypes args count mismatch during executer stage, types: "
570+
<< KeyTypesToString(*columnShardHashV1Params.SourceTableKeyColumnTypes) << " for the columns: "
571+
<< "[" << JoinSeq(",", input.GetHashShuffle().GetKeyColumns()) << "]"
572+
);
573+
574+
inputStageInfo.Meta.HashParamsByOutput[outputIdx] = columnShardHashV1Params;
550575
hashKind = NHashKind::EColumnShardHashV1;
551576
break;
552577
}
553578
default: {
554579
Y_ENSURE(false, "undefined type of hash for shuffle");
555580
}
556581
}
557-
BuildHashShuffleChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx,
558-
input.GetHashShuffle().GetKeyColumns(), enableSpilling, log, hashKind);
582+
BuildHashShuffleChannels(
583+
tasksGraph,
584+
stageInfo,
585+
inputIdx,
586+
inputStageInfo,
587+
outputIdx,
588+
input.GetHashShuffle().GetKeyColumns(),
589+
enableSpilling,
590+
log,
591+
hashKind
592+
);
559593
break;
560594
}
561595
case NKqpProto::TKqpPhyConnection::kBroadcast:
@@ -1036,7 +1070,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
10361070
private:
10371071
const TTableConstInfo& TableInfo;
10381072
public:
1039-
TResolverTable(const TTableConstInfo& tableInfo)
1073+
TResolverTable(const TTableConstInfo& tableInfo)
10401074
: TableInfo(tableInfo) {
10411075

10421076
}
@@ -1118,9 +1152,10 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
11181152
}
11191153

11201154
void FillOutputDesc(
1121-
const TKqpTasksGraph& tasksGraph,
1122-
NYql::NDqProto::TTaskOutput& outputDesc,
1123-
const TTaskOutput& output,
1155+
const TKqpTasksGraph& tasksGraph,
1156+
NYql::NDqProto::TTaskOutput& outputDesc,
1157+
const TTaskOutput& output,
1158+
ui32 outputIdx,
11241159
bool enableSpilling,
11251160
const TStageInfo& stageInfo
11261161
) {
@@ -1143,19 +1178,37 @@ void FillOutputDesc(
11431178
break;
11441179
}
11451180
case NHashKind::EColumnShardHashV1: {
1146-
Y_ENSURE(stageInfo.Meta.SourceShardCount != 0, "ShardCount for ColumnShardHashV1 Shuffle can't be equal to 0");
1147-
Y_ENSURE(stageInfo.Meta.TaskIdByHash != nullptr, "TaskIdByHash for ColumnShardHashV1 wasn't propogated to this stage");
1148-
Y_ENSURE(stageInfo.Meta.SourceTableKeyColumnTypes != nullptr, "SourceTableKeyColumnTypes for ColumnShardHashV1 wasn't propogated to this stage");
1181+
auto& columnShardHashV1Params = stageInfo.Meta.GetColumnShardHashV1Params(outputIdx);
1182+
LOG_DEBUG_S(
1183+
*TlsActivationContext,
1184+
NKikimrServices::KQP_EXECUTER,
1185+
"Filling columnshardhashv1 params for sending it to runtime "
1186+
<< "[" << stageInfo.Id.TxId << ":" << stageInfo.Id.StageId << "]"
1187+
<< ": " << KeyTypesToString(*columnShardHashV1Params.SourceTableKeyColumnTypes)
1188+
<< " for the columns: " << "[" << JoinSeq(",", output.KeyColumns) << "]"
1189+
);
1190+
Y_ENSURE(columnShardHashV1Params.SourceShardCount != 0, "ShardCount for ColumnShardHashV1 Shuffle can't be equal to 0");
1191+
Y_ENSURE(columnShardHashV1Params.TaskIdByHash != nullptr, "TaskIdByHash for ColumnShardHashV1 wasn't propogated to this stage");
1192+
Y_ENSURE(columnShardHashV1Params.SourceTableKeyColumnTypes != nullptr, "SourceTableKeyColumnTypes for ColumnShardHashV1 wasn't propogated to this stage");
1193+
1194+
Y_ENSURE(
1195+
columnShardHashV1Params.SourceTableKeyColumnTypes->size() == output.KeyColumns.size(),
1196+
TStringBuilder{}
1197+
<< "Hashshuffle keycolumns and keytypes args count mismatch during executer stage, types: "
1198+
<< KeyTypesToString(*columnShardHashV1Params.SourceTableKeyColumnTypes) << " for the columns: "
1199+
<< "[" << JoinSeq(",", output.KeyColumns) << "]"
1200+
);
1201+
11491202
auto& columnShardHashV1 = *hashPartitionDesc.MutableColumnShardHashV1();
1150-
columnShardHashV1.SetShardCount(stageInfo.Meta.SourceShardCount);
1203+
columnShardHashV1.SetShardCount(columnShardHashV1Params.SourceShardCount);
11511204

11521205
auto* columnTypes = columnShardHashV1.MutableKeyColumnTypes();
1153-
for (const auto& type: *stageInfo.Meta.SourceTableKeyColumnTypes) {
1206+
for (const auto& type: *columnShardHashV1Params.SourceTableKeyColumnTypes) {
11541207
columnTypes->Add(type.GetTypeId());
11551208
}
11561209

11571210
auto* taskIdByHash = columnShardHashV1.MutableTaskIdByHash();
1158-
for (std::size_t taskID: *stageInfo.Meta.TaskIdByHash) {
1211+
for (std::size_t taskID: *columnShardHashV1Params.TaskIdByHash) {
11591212
taskIdByHash->Add(taskID);
11601213
}
11611214
break;
@@ -1330,8 +1383,9 @@ void SerializeTaskToProto(
13301383
if (task.Outputs.size() > 1) {
13311384
enableSpilling = tasksGraph.GetMeta().AllowWithSpilling;
13321385
}
1333-
for (const auto& output : task.Outputs) {
1334-
FillOutputDesc(tasksGraph, *result->AddOutputs(), output, enableSpilling, stageInfo);
1386+
for (ui32 outputIdx = 0; outputIdx < task.Outputs.size(); ++outputIdx) {
1387+
const auto& output = task.Outputs[outputIdx];
1388+
FillOutputDesc(tasksGraph, *result->AddOutputs(), output, outputIdx, enableSpilling, stageInfo);
13351389
}
13361390

13371391
const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);

ydb/core/kqp/executer_actor/kqp_tasks_graph.h

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ struct TTransaction : private TMoveOnly {
2929
, Params(std::move(params)) {}
3030
};
3131

32+
struct TColumnShardHashV1Params {
33+
ui64 SourceShardCount = 0;
34+
std::shared_ptr<TVector<NScheme::TTypeInfo>> SourceTableKeyColumnTypes = nullptr;
35+
std::shared_ptr<TVector<ui64>> TaskIdByHash = nullptr; // hash belongs [0; ShardCount]
36+
};
37+
3238
struct TStageInfoMeta {
3339
const IKqpGateway::TPhysicalTxData& Tx;
3440

@@ -44,11 +50,32 @@ struct TStageInfoMeta {
4450
THolder<TKeyDesc> ShardKey;
4551
NSchemeCache::TSchemeCacheRequest::EKind ShardKind = NSchemeCache::TSchemeCacheRequest::EKind::KindUnknown;
4652

47-
// used for ColumnV1Hashing
48-
ui64 SourceShardCount = 0;
49-
std::shared_ptr<TVector<NScheme::TTypeInfo>> SourceTableKeyColumnTypes = nullptr;
50-
std::shared_ptr<TVector<ui64>> TaskIdByHash = nullptr; // hash belongs [0; ShardCount]
51-
//
53+
////////////////////////////////////////////////////////////////////////////////////////////////
54+
55+
TColumnShardHashV1Params ColumnShardHashV1Params;
56+
THashMap<ui32, TColumnShardHashV1Params> HashParamsByOutput;
57+
58+
TColumnShardHashV1Params& GetColumnShardHashV1Params(ui32 outputIdx) {
59+
if (HashParamsByOutput.contains(outputIdx)) {
60+
return HashParamsByOutput[outputIdx];
61+
}
62+
return ColumnShardHashV1Params;
63+
}
64+
65+
const TColumnShardHashV1Params& GetColumnShardHashV1Params(ui32 outputIdx) const {
66+
if (HashParamsByOutput.contains(outputIdx)) {
67+
return HashParamsByOutput.at(outputIdx);
68+
}
69+
return ColumnShardHashV1Params;
70+
}
71+
72+
/*
73+
* We want to propogate params for hash func through the stages. In default sutiation we do it by only ColumnShardHashV1Params.
74+
* But challenges appear when there is CTE in plan. So we must store mapping from the outputStageIdx to params.
75+
* Otherwise, we will rewrite ColumnShardHashV1Params, when we will meet the same stage again during propogation.
76+
*/
77+
78+
////////////////////////////////////////////////////////////////////////////////////////////////
5279

5380
const NKqpProto::TKqpPhyStage& GetStage(const size_t idx) const {
5481
auto& txBody = Tx.Body;
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
PRAGMA ydb.OptShuffleElimination="true";
2+
3+
$z1_12 = 1;
4+
$p = (select p_partkey, p_name
5+
from
6+
part
7+
where p_name like '%green%');
8+
$j1 = (select ps_partkey, ps_suppkey, ps_supplycost
9+
from
10+
partsupp as ps
11+
join $p as p
12+
on ps.ps_partkey = p.p_partkey);
13+
$j2 = (select l_suppkey, l_partkey, l_orderkey, l_extendedprice, l_discount, ps_supplycost, l_quantity
14+
from
15+
lineitem as l
16+
join $j1 as j
17+
on l.l_suppkey = j.ps_suppkey AND l.l_partkey = j.ps_partkey);
18+
$j3 = (select l_orderkey, s_nationkey, l_extendedprice, l_discount, ps_supplycost, l_quantity
19+
from
20+
supplier as s
21+
join $j2 as j
22+
on j.l_suppkey = s.s_suppkey);
23+
$j4 = (select o_orderdate, l_extendedprice, l_discount, ps_supplycost, l_quantity, s_nationkey
24+
from
25+
orders as o
26+
join $j3 as j
27+
on o.o_orderkey = j.l_orderkey);
28+
$j5 = (select n_name, o_orderdate, l_extendedprice, l_discount, ps_supplycost, l_quantity
29+
from
30+
nation as n
31+
join $j4 as j
32+
on j.s_nationkey = n.n_nationkey
33+
);
34+
$profit = (select
35+
n_name as nation,
36+
DateTime::GetYear(cast(o_orderdate as timestamp)) as o_year,
37+
l_extendedprice * ($z1_12 - l_discount) - ps_supplycost * l_quantity as amount
38+
from $j5);
39+
select
40+
nation,
41+
o_year,
42+
sum(amount) as sum_profit
43+
from $profit
44+
group by
45+
nation,
46+
o_year
47+
order by
48+
nation,
49+
o_year desc;
50+

0 commit comments

Comments
 (0)