Skip to content

Commit c0bd909

Browse files
authored
[KQP] Shuffle elimination flag bug fix (#16367)
1 parent a9761dc commit c0bd909

29 files changed

+385
-656
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1652,11 +1652,12 @@ class TKqpExecuterBase : public TActor<TDerived> {
16521652
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
16531653

16541654
auto& columnShardHashV1Params = stageInfo.Meta.ColumnShardHashV1Params;
1655-
if (enableShuffleElimination && stageInfo.Meta.ColumnTableInfoPtr) {
1655+
if (enableShuffleElimination && stage.GetIsShuffleEliminated() && stageInfo.Meta.ColumnTableInfoPtr) {
16561656
const auto& tableDesc = stageInfo.Meta.ColumnTableInfoPtr->Description;
16571657
columnShardHashV1Params.SourceShardCount = tableDesc.GetColumnShardCount();
16581658
columnShardHashV1Params.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
16591659
for (const auto& column: tableDesc.GetSharding().GetHashSharding().GetColumns()) {
1660+
Y_ENSURE(stageInfo.Meta.TableConstInfo->Columns.contains(column), TStringBuilder{} << "Table doesn't have column: " << column);
16601661
auto columnType = stageInfo.Meta.TableConstInfo->Columns.at(column).Type;
16611662
columnShardHashV1Params.SourceTableKeyColumnTypes->push_back(columnType);
16621663
}

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,8 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
545545
columnShardHashV1Params.SourceTableKeyColumnTypes->reserve(columnShardHashV1.KeyColumnTypesSize());
546546
for (const auto& keyColumnType: columnShardHashV1.GetKeyColumnTypes()) {
547547
auto typeId = static_cast<NScheme::TTypeId>(keyColumnType);
548-
auto typeInfo = NScheme::TTypeInfo{typeId};
548+
auto typeInfo =
549+
typeId == NScheme::NTypeIds::Decimal? NScheme::TTypeInfo(NKikimr::NScheme::TDecimalType::Default()): NScheme::TTypeInfo(typeId);
549550
columnShardHashV1Params.SourceTableKeyColumnTypes->push_back(typeInfo);
550551
}
551552
break;

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
163163
TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
164164
auto maxDPhypDPTableSize = Config->MaxDPHypDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPHypDPTableSize);
165165
auto optLevel = Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel);
166-
bool enableShuffleElimination = KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(Config->DefaultEnableShuffleElimination);
166+
bool enableShuffleElimination = KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(KqpCtx.Config->DefaultEnableShuffleElimination);
167167
auto providerCtx = TKqpProviderContext(KqpCtx, optLevel);
168168
auto opt = std::unique_ptr<IOptimizerNew>(MakeNativeOptimizerNew(providerCtx, maxDPhypDPTableSize, ctx, enableShuffleElimination));
169169
TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, optLevel,

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
452452
bool shuffleEliminationWithMap = KqpCtx.Config->OptShuffleEliminationWithMap.Get().GetOrElse(true);
453453
bool rightCollectStage = !KqpCtx.Config->AllowMultiBroadcasts;
454454
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
455-
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false), KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(false), shuffleEliminationWithMap,
455+
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false), KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(KqpCtx.Config->DefaultEnableShuffleElimination), shuffleEliminationWithMap,
456456
rightCollectStage
457457
);
458458
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
919919
i.MutableProgram()->MutableSettings()->SetLevelDataPrediction(rPredictor.GetLevelDataVolume(i.GetProgram().GetSettings().GetStageLevel()));
920920
}
921921

922-
txProto.SetEnableShuffleElimination(Config->OptShuffleElimination.Get().GetOrElse(false));
922+
txProto.SetEnableShuffleElimination(Config->OptShuffleElimination.Get().GetOrElse(Config->DefaultEnableShuffleElimination));
923923
txProto.SetHasEffects(hasEffectStage);
924924

925925
for (const auto& paramBinding : tx.ParamBindings()) {
@@ -1336,13 +1336,16 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
13361336
shuffleProto.AddKeyColumns(TString(keyColumn));
13371337
}
13381338

1339-
if (Config->OptShuffleElimination.Get().GetOrElse(false)) {
1339+
if (Config->OptShuffleElimination.Get().GetOrElse(Config->DefaultEnableShuffleElimination)) {
13401340
auto& columnHashV1 = *shuffleProto.MutableColumnShardHashV1();
13411341

13421342
const auto& outputType = NYql::NDq::GetDqConnectionType(connection, ctx);
13431343
auto structType = outputType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
13441344
for (const auto& column: shuffle.KeyColumns().Ptr()->Children()) {
13451345
auto ty = NYql::NDq::GetColumnType(connection, *structType, column->Content(), column->Pos(), ctx);
1346+
if (ty->GetKind() == ETypeAnnotationKind::List) {
1347+
ty = ty->Cast<TListExprType>()->GetItemType();
1348+
}
13461349
NYql::NUdf::EDataSlot slot;
13471350
switch (ty->GetKind()) {
13481351
case ETypeAnnotationKind::Data: {
@@ -1351,6 +1354,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
13511354
}
13521355
case ETypeAnnotationKind::Optional: {
13531356
auto optionalType = ty->Cast<TOptionalExprType>()->GetItemType();
1357+
if (optionalType->GetKind() == ETypeAnnotationKind::List) {
1358+
optionalType = optionalType->Cast<TListExprType>()->GetItemType();
1359+
}
13541360
Y_ENSURE(
13551361
optionalType->GetKind() == ETypeAnnotationKind::Data,
13561362
TStringBuilder{} << "Can't retrieve type from optional" << static_cast<std::int64_t>(optionalType->GetKind()) << "for ColumnHashV1 Shuffling"

0 commit comments

Comments
 (0)