Skip to content

Commit c6c7c55

Browse files
gridnevvvitssmikeulya-sidorinapavelvelikhov
authored
Merge stream lookup changes (#12292)
Co-authored-by: Mikhail Surin <[email protected]> Co-authored-by: ulya-sidorina <[email protected]> Co-authored-by: Pavel Velikhov <[email protected]>
1 parent 21bf5e0 commit c6c7c55

13 files changed

+296
-100
lines changed

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
415415
}
416416

417417
settings->SetLookupStrategy(streamLookup.GetLookupStrategy());
418+
settings->SetKeepRowsOrder(streamLookup.GetKeepRowsOrder());
418419

419420
TTransform streamLookupTransform;
420421
streamLookupTransform.Type = "StreamLookupInputTransformer";

ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim
110110
matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos());
111111
}
112112

113+
if (kqpCtx.Config->HasMaxSequentialReadsInFlight()) {
114+
settings.SequentialInFlight = *kqpCtx.Config->MaxSequentialReadsInFlight.Get();
115+
matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos());
116+
}
117+
113118
TVector<TExprBase> inputs;
114119
TVector<TCoArgument> args;
115120
TNodeOnNodeOwnedMap argReplaces;

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
8787
REGISTER_SETTING(*this, OptimizerHints).Parser([](const TString& v) { return NYql::TOptimizerHints::Parse(v); });
8888
REGISTER_SETTING(*this, OverridePlanner);
8989
REGISTER_SETTING(*this, UseGraceJoinCoreForMap);
90+
REGISTER_SETTING(*this, EnableOrderPreservingLookupJoin);
9091

9192
REGISTER_SETTING(*this, OptUseFinalizeByKey);
9293
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
@@ -96,6 +97,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
9697
REGISTER_SETTING(*this, MaxDPccpDPTableSize);
9798

9899
REGISTER_SETTING(*this, MaxTasksPerStage);
100+
REGISTER_SETTING(*this, MaxSequentialReadsInFlight);
99101

100102
/* Runtime */
101103
REGISTER_SETTING(*this, ScanQuery);
@@ -120,6 +122,10 @@ bool TKikimrSettings::SpillingEnabled() const {
120122
return GetFlagValue(_KqpEnableSpilling.Get());
121123
}
122124

125+
bool TKikimrSettings::OrderPreservingLookupJoinEnabled() const {
126+
return GetFlagValue(EnableOrderPreservingLookupJoin.Get());
127+
}
128+
123129
bool TKikimrSettings::DisableLlvmForUdfStages() const {
124130
return GetFlagValue(_KqpDisableLlvmForUdfStages.Get());
125131
}
@@ -148,6 +154,10 @@ bool TKikimrSettings::HasOptUseFinalizeByKey() const {
148154
return GetFlagValue(OptUseFinalizeByKey.Get().GetOrElse(true)) != EOptionalFlag::Disabled;
149155
}
150156

157+
bool TKikimrSettings::HasMaxSequentialReadsInFlight() const {
158+
return !MaxSequentialReadsInFlight.Get().Empty();
159+
}
160+
151161
EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
152162
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
153163
}

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ struct TKikimrSettings {
5454
NCommon::TConfSetting<ui64, false> EnableSpillingNodes;
5555
NCommon::TConfSetting<TString, false> OverridePlanner;
5656
NCommon::TConfSetting<bool, false> UseGraceJoinCoreForMap;
57+
NCommon::TConfSetting<bool, false> EnableOrderPreservingLookupJoin;
5758

5859
NCommon::TConfSetting<TString, false> OptOverrideStatistics;
5960
NCommon::TConfSetting<NYql::TOptimizerHints, false> OptimizerHints;
@@ -72,6 +73,7 @@ struct TKikimrSettings {
7273

7374

7475
NCommon::TConfSetting<ui32, false> MaxTasksPerStage;
76+
NCommon::TConfSetting<ui32, false> MaxSequentialReadsInFlight;
7577

7678
/* Runtime */
7779
NCommon::TConfSetting<bool, true> ScanQuery;
@@ -88,7 +90,8 @@ struct TKikimrSettings {
8890
bool HasOptEnableOlapPushdown() const;
8991
bool HasOptEnableOlapProvideComputeSharding() const;
9092
bool HasOptUseFinalizeByKey() const;
91-
93+
bool HasMaxSequentialReadsInFlight() const;
94+
bool OrderPreservingLookupJoinEnabled() const;
9295
EOptionalFlag GetOptPredicateExtract() const;
9396
EOptionalFlag GetUseLlvm() const;
9497
NDq::EHashJoinMode GetHashJoinMode() const;

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
12771277
YQL_ENSURE(streamLookup.LookupStrategy().Maybe<TCoAtom>());
12781278
TString lookupStrategy = streamLookup.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue();
12791279
streamLookupProto.SetLookupStrategy(GetStreamLookupStrategy(lookupStrategy));
1280+
streamLookupProto.SetKeepRowsOrder(Config->OrderPreservingLookupJoinEnabled());
12801281

12811282
switch (streamLookupProto.GetLookupStrategy()) {
12821283
case NKqpProto::EStreamLookupStrategy::LOOKUP: {

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
268268
RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite(),
269269
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
270270
}
271+
} catch (const NKikimr::TMemoryLimitExceededException& e) {
272+
RuntimeError("Memory limit exceeded at stream lookup", NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
271273
} catch (const yexception& e) {
272274
RuntimeError(e.what(), NYql::NDqProto::StatusIds::INTERNAL_ERROR);
273275
}
@@ -400,6 +402,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
400402
}
401403
}
402404

405+
auto guard = BindAllocator();
403406
StreamLookupWorker->AddResult(TKqpStreamLookupWorker::TShardReadResult{
404407
read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())
405408
});

0 commit comments

Comments
 (0)