Skip to content

Commit 8b7eb04

Browse files
committed
initial
1 parent adc4da7 commit 8b7eb04

18 files changed

+89
-31
lines changed

ydb/core/kqp/host/kqp_host.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -1558,6 +1558,7 @@ class TKqpHost : public IKqpHost {
15581558
|| settingName == "DisableOrderedColumns"
15591559
|| settingName == "Warning"
15601560
|| settingName == "UseBlocks"
1561+
|| settingName == "BlockEngine"
15611562
;
15621563
};
15631564
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);

ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeA
5858
} else if (auto maybeRead = node.Maybe<TKqpReadOlapTableRanges>()) {
5959
auto read = maybeRead.Cast();
6060

61-
if (typesCtx.UseBlocks) {
61+
if (typesCtx.IsBlockEngineEnabled()) {
6262
wideRead = Build<TCoWideFromBlocks>(ctx, node.Pos())
6363
.Input<TKqpBlockReadOlapTableRanges>()
6464
.Table(read.Table())

ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -7774,7 +7774,7 @@ THolder<IGraphTransformer> CreatePeepHoleFinalStageTransformer(TTypeAnnotationCo
77747774
pipeline.Add(
77757775
CreateFunctorTransformer(
77767776
[&types](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
7777-
if (types.UseBlocks) {
7777+
if (types.IsBlockEngineEnabled()) {
77787778
const auto& extStageRules = TPeepHoleRules::Instance().BlockStageExtRules;
77797779
return PeepHoleBlockStage(input, output, ctx, types, extStageRules);
77807780
} else {
@@ -7789,7 +7789,7 @@ THolder<IGraphTransformer> CreatePeepHoleFinalStageTransformer(TTypeAnnotationCo
77897789
pipeline.Add(
77907790
CreateFunctorTransformer(
77917791
[&types](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
7792-
if (types.UseBlocks) {
7792+
if (types.IsBlockEngineEnabled()) {
77937793
const auto& extStageRules = TPeepHoleRules::Instance().BlockStageExtFinalRules;
77947794
return PeepHoleBlockStage(input, output, ctx, types, extStageRules);
77957795
} else {

ydb/library/yql/core/type_ann/type_ann_list.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -4689,7 +4689,7 @@ namespace {
46894689
return IGraphTransformer::TStatus::Repeat;
46904690
}
46914691

4692-
if (isMany && ctx.Types.UseBlocks) {
4692+
if (isMany && ctx.Types.IsBlockEngineEnabled()) {
46934693
auto streamIndex = inputStructType->FindItem("_yql_group_stream_index");
46944694
if (streamIndex) {
46954695
const TTypeAnnotationNode* streamIndexType = inputStructType->GetItems()[*streamIndex]->GetItemType();

ydb/library/yql/core/yql_aggregate_expander.cpp

+7-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregate()
2525

2626
HaveDistinct = AnyOf(AggregatedColumns->ChildrenList(),
2727
[](const auto& child) { return child->ChildrenSize() == 3; });
28-
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.UseBlocks) || ForceCompact || HasSetting(*settings, "compact");
28+
EffectiveCompact = (HaveDistinct && CompactForDistinct && !TypesCtx.IsBlockEngineEnabled()) || ForceCompact || HasSetting(*settings, "compact");
2929
for (const auto& trait : Traits) {
3030
auto mergeLambda = trait->Child(5);
3131
if (mergeLambda->Tail().IsCallable("Void")) {
@@ -56,7 +56,7 @@ TExprNode::TPtr TAggregateExpander::ExpandAggregate()
5656
return GeneratePhases();
5757
}
5858

59-
if (TypesCtx.UseBlocks) {
59+
if (TypesCtx.IsBlockEngineEnabled()) {
6060
if (Suffix == "Combine") {
6161
auto ret = TryGenerateBlockCombine();
6262
if (ret) {
@@ -2776,7 +2776,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
27762776
streams.push_back(SerializeIdxSet(indicies));
27772777
}
27782778

2779-
if (TypesCtx.UseBlocks) {
2779+
if (TypesCtx.IsBlockEngineEnabled()) {
27802780
for (ui32 i = 0; i < unionAllInputs.size(); ++i) {
27812781
unionAllInputs[i] = Ctx.Builder(Node->Pos())
27822782
.Callable("Map")
@@ -2797,7 +2797,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePhases() {
27972797
}
27982798

27992799
auto settings = Node->ChildPtr(3);
2800-
if (TypesCtx.UseBlocks) {
2800+
if (TypesCtx.IsBlockEngineEnabled()) {
28012801
settings = AddSetting(*settings, Node->Pos(), "many_streams", Ctx.NewList(Node->Pos(), std::move(streams)), Ctx);
28022802
}
28032803

@@ -2830,7 +2830,7 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombine() {
28302830
}
28312831

28322832
TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalize() {
2833-
if (UsePartitionsByKeys || !TypesCtx.UseBlocks) {
2833+
if (UsePartitionsByKeys || !TypesCtx.IsBlockEngineEnabled()) {
28342834
return nullptr;
28352835
}
28362836

@@ -2919,13 +2919,13 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
29192919
TExprNode::TPtr ExpandAggregatePeephole(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
29202920
if (NNodes::TCoAggregate::Match(node.Get())) {
29212921
NNodes::TCoAggregate self(node);
2922-
auto ret = TAggregateExpander::CountAggregateRewrite(self, ctx, typesCtx.UseBlocks);
2922+
auto ret = TAggregateExpander::CountAggregateRewrite(self, ctx, typesCtx.IsBlockEngineEnabled());
29232923
if (ret != node) {
29242924
YQL_CLOG(DEBUG, Core) << "CountAggregateRewrite on peephole";
29252925
return ret;
29262926
}
29272927
}
2928-
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.UseBlocks);
2928+
return ExpandAggregatePeepholeImpl(node, ctx, typesCtx, false, typesCtx.IsBlockEngineEnabled());
29292929
}
29302930

29312931
} // namespace NYql

ydb/library/yql/core/yql_type_annotation.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,12 @@ enum class EMatchRecognizeStreamingMode {
193193
Force,
194194
};
195195

196+
enum class EBlockEngineMode {
197+
Disable /* "disable" */,
198+
Auto /* "auto" */,
199+
Force /* "force" */,
200+
};
201+
196202
struct TUdfCachedInfo {
197203
const TTypeAnnotationNode* FunctionType = nullptr;
198204
const TTypeAnnotationNode* RunConfigType = nullptr;
@@ -251,6 +257,7 @@ struct TTypeAnnotationContext: public TThrRefBase {
251257
bool YsonCastToString = true;
252258
ui32 FolderSubDirsLimit = 1000;
253259
bool UseBlocks = false;
260+
EBlockEngineMode BlockEngineMode = EBlockEngineMode::Disable;
254261
bool PgEmitAggApply = false;
255262
IArrowResolver::TPtr ArrowResolver;
256263
ECostBasedOptimizerType CostBasedOptimizer = ECostBasedOptimizerType::Disable;
@@ -350,7 +357,10 @@ struct TTypeAnnotationContext: public TThrRefBase {
350357
void SetStats(const TExprNode* input, std::shared_ptr<TOptimizerStatistics> stats) {
351358
StatisticsMap[input] = stats;
352359
}
353-
360+
361+
bool IsBlockEngineEnabled() const {
362+
return BlockEngineMode != EBlockEngineMode::Disable || UseBlocks;
363+
}
354364
};
355365

356366
template <> inline

ydb/library/yql/dq/opt/dq_opt_log.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ TExprBase DqRewriteAggregate(TExprBase node, TExprContext& ctx, TTypeAnnotationC
2121
if (!node.Maybe<TCoAggregateBase>()) {
2222
return node;
2323
}
24-
TAggregateExpander aggExpander(true, !typesCtx.UseBlocks && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
24+
TAggregateExpander aggExpander(true, !typesCtx.IsBlockEngineEnabled() && !useFinalizeByKey, useFinalizeByKey, node.Ptr(), ctx, typesCtx, false, compactForDistinct, usePhases);
2525
auto result = aggExpander.ExpandAggregate();
2626
YQL_ENSURE(result);
2727

ydb/library/yql/dq/opt/dq_opt_peephole.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -681,7 +681,7 @@ NNodes::TExprBase DqPeepholeRewriteLength(const NNodes::TExprBase& node, TExprCo
681681
}
682682

683683
auto dqPhyLength = node.Cast<TDqPhyLength>();
684-
if (typesCtx.UseBlocks) {
684+
if (typesCtx.IsBlockEngineEnabled()) {
685685
return NNodes::TExprBase(ctx.Builder(node.Pos())
686686
.Callable("NarrowMap")
687687
.Callable(0, "BlockCombineAll")

ydb/library/yql/providers/config/yql_config_provider.cpp

+12
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,18 @@ namespace {
888888
return false;
889889
}
890890
}
891+
else if (name == "BlockEngine") {
892+
if (args.size() != 1) {
893+
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected at most 1 argument, but got " << args.size()));
894+
return false;
895+
}
896+
897+
auto arg = TString{args[0]};
898+
if (!TryFromString(arg, Types.BlockEngineMode)) {
899+
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected `disable|auto|force', but got: " << args[0]));
900+
return false;
901+
}
902+
}
891903
else {
892904
ctx.AddError(TIssue(pos, TStringBuilder() << "Unsupported command: " << name));
893905
return false;

ydb/library/yql/providers/dq/opt/logical_optimize.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
324324
auto input = aggregate.Input().Maybe<TDqConnection>();
325325

326326
if (input) {
327-
auto newNode = TAggregateExpander::CountAggregateRewrite(aggregate, ctx, TypesCtx.UseBlocks);
327+
auto newNode = TAggregateExpander::CountAggregateRewrite(aggregate, ctx, TypesCtx.IsBlockEngineEnabled());
328328
if (node.Ptr() != newNode) {
329329
return TExprBase(newNode);
330330
}

ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -2512,7 +2512,7 @@ class TYtLogicalOptProposalTransformer : public TOptimizeTransformerBase {
25122512
return node;
25132513
}
25142514

2515-
return TAggregateExpander::CountAggregateRewrite(aggregate, ctx, State_->Types->UseBlocks);
2515+
return TAggregateExpander::CountAggregateRewrite(aggregate, ctx, State_->Types->IsBlockEngineEnabled());
25162516
}
25172517

25182518
TMaybeNode<TExprBase> ZeroSampleToZeroLimit(TExprBase node, TExprContext& ctx) const {

ydb/library/yql/sql/pg/pg_sql.cpp

+27-8
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ class TConverter : public IPGParseEvents {
272272
: AstParseResult(astParseResult)
273273
, Settings(settings)
274274
, DqEngineEnabled(Settings.DqDefaultAuto->Allow())
275+
, BlockEngineEnabled(Settings.BlockDefaultAuto->Allow())
275276
{
276277
Positions.push_back({});
277278
ScanRows(query);
@@ -281,6 +282,10 @@ class TConverter : public IPGParseEvents {
281282
DqEngineEnabled = true;
282283
} else if (flag == "DqEngineForce") {
283284
DqEngineForce = true;
285+
} else if (flag == "BlockEngineEnable") {
286+
BlockEngineEnabled = true;
287+
} else if (flag == "BlockEngineForce") {
288+
BlockEngineForce = true;
284289
}
285290
}
286291

@@ -320,6 +325,8 @@ class TConverter : public IPGParseEvents {
320325
Statements.push_back(L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource,
321326
QA("OrderedColumns"))));
322327

328+
ui32 blockEnginePgmPos = Statements.size();
329+
Statements.push_back(configSource);
323330
ui32 costBasedOptimizerPos = Statements.size();
324331
Statements.push_back(configSource);
325332
ui32 dqEnginePgmPos = Statements.size();
@@ -359,6 +366,13 @@ class TConverter : public IPGParseEvents {
359366
Statements.erase(Statements.begin() + costBasedOptimizerPos);
360367
}
361368

369+
if (BlockEngineEnabled) {
370+
Statements[blockEnginePgmPos] = L(A("let"), A("world"), L(A(TString(NYql::ConfigureName)), A("world"), configSource,
371+
QA("BlockEngine"), QA(BlockEngineForce ? "force" : "auto")));
372+
} else {
373+
Statements.erase(Statements.begin() + blockEnginePgmPos);
374+
}
375+
362376
return VL(Statements.data(), Statements.size());
363377
}
364378

@@ -2001,7 +2015,7 @@ class TConverter : public IPGParseEvents {
20012015
AddError(TStringBuilder() << "VariableSetStmt, expected string literal for " << value->name << " option");
20022016
return nullptr;
20032017
}
2004-
} else if (name == "dqengine") {
2018+
} else if (name == "dqengine" || name == "blockengine") {
20052019
if (ListLength(value->args) != 1) {
20062020
AddError(TStringBuilder() << "VariableSetStmt, expected 1 arg, but got: " << ListLength(value->args));
20072021
return nullptr;
@@ -2011,17 +2025,20 @@ class TConverter : public IPGParseEvents {
20112025
if (NodeTag(arg) == T_A_Const && (NodeTag(CAST_NODE(A_Const, arg)->val) == T_String)) {
20122026
auto rawStr = StrVal(CAST_NODE(A_Const, arg)->val);
20132027
auto str = to_lower(TString(rawStr));
2028+
const bool isDqEngine = name == "dqengine";
2029+
auto& enable = isDqEngine ? DqEngineEnabled : BlockEngineEnabled;
2030+
auto& force = isDqEngine ? DqEngineForce : BlockEngineForce;
20142031
if (str == "auto") {
2015-
DqEngineEnabled = true;
2016-
DqEngineForce = false;
2032+
enable = true;
2033+
force = false;
20172034
} else if (str == "force") {
2018-
DqEngineEnabled = true;
2019-
DqEngineForce = true;
2035+
enable = true;
2036+
force = true;
20202037
} else if (str == "disable") {
2021-
DqEngineEnabled = false;
2022-
DqEngineForce = false;
2038+
enable = false;
2039+
force = false;
20232040
} else {
2024-
AddError(TStringBuilder() << "VariableSetStmt, not supported DqEngine option value: " << rawStr);
2041+
AddError(TStringBuilder() << "VariableSetStmt, not supported " << value->name << " option value: " << rawStr);
20252042
return nullptr;
20262043
}
20272044
} else {
@@ -4247,6 +4264,8 @@ class TConverter : public IPGParseEvents {
42474264
bool DqEngineEnabled = false;
42484265
bool DqEngineForce = false;
42494266
TString CostBasedOptimizer;
4267+
bool BlockEngineEnabled = false;
4268+
bool BlockEngineForce = false;
42504269
TVector<TAstNode*> Statements;
42514270
ui32 ReadIndex = 0;
42524271
TViews Views;

ydb/library/yql/sql/settings/translation_settings.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ namespace NSQLTranslation {
5656
, WarnOnV0(true)
5757
, V0WarnAsError(ISqlFeaturePolicy::MakeAlwaysDisallow())
5858
, DqDefaultAuto(ISqlFeaturePolicy::MakeAlwaysDisallow())
59+
, BlockDefaultAuto(ISqlFeaturePolicy::MakeAlwaysDisallow())
5960
, AssumeYdbOnClusterWithSlash(false)
6061
{}
6162

ydb/library/yql/sql/settings/translation_settings.h

+1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ namespace NSQLTranslation {
101101
bool WarnOnV0;
102102
ISqlFeaturePolicy::TPtr V0WarnAsError;
103103
ISqlFeaturePolicy::TPtr DqDefaultAuto;
104+
ISqlFeaturePolicy::TPtr BlockDefaultAuto;
104105
bool AssumeYdbOnClusterWithSlash;
105106
TString DynamicClusterProvider;
106107
TString FileAliasPrefix;

ydb/library/yql/sql/v1/context.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ THashMap<TStringBuf, TPragmaField> CTX_PRAGMA_FIELDS = {
5858
{"EmitAggApply", &TContext::EmitAggApply},
5959
{"AnsiLike", &TContext::AnsiLike},
6060
{"UseBlocks", &TContext::UseBlocks},
61+
{"BlockEngineEnable", &TContext::BlockEngineEnable},
62+
{"BlockEngineForce", &TContext::BlockEngineForce},
6163
};
6264

6365
typedef TMaybe<bool> TContext::*TPragmaMaybeField;
@@ -84,6 +86,7 @@ TContext::TContext(const NSQLTranslation::TTranslationSettings& settings,
8486
, HasPendingErrors(false)
8587
, DqEngineEnable(Settings.DqDefaultAuto->Allow())
8688
, AnsiQuotedIdentifiers(settings.AnsiLexer)
89+
, BlockEngineEnable(Settings.BlockDefaultAuto->Allow())
8790
{
8891
for (auto lib : settings.Libraries) {
8992
Libraries.emplace(lib, TLibraryStuff());

ydb/library/yql/sql/v1/context.h

+2
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,8 @@ namespace NSQLTranslationV1 {
308308
bool AnsiLike = false;
309309
bool FeatureR010 = false; //Row pattern recognition: FROM clause
310310
TMaybe<bool> CompactGroupBy;
311+
bool BlockEngineEnable = false;
312+
bool BlockEngineForce = false;
311313
};
312314

313315
class TColumnRefScope {

ydb/library/yql/sql/v1/query.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -2691,6 +2691,12 @@ class TYqlProgramNode: public TAstListNode {
26912691
if (ctx.UseBlocks) {
26922692
Add(Y("let", "world", Y(TString(ConfigureName), "world", configSource, BuildQuotedAtom(Pos, "UseBlocks"))));
26932693
}
2694+
2695+
if (ctx.BlockEngineEnable) {
2696+
TString mode = ctx.BlockEngineForce ? "force" : "auto";
2697+
Add(Y("let", "world", Y(TString(ConfigureName), "world", configSource,
2698+
BuildQuotedAtom(Pos, "BlockEngine"), BuildQuotedAtom(Pos, mode))));
2699+
}
26942700
}
26952701
}
26962702

ydb/library/yql/sql/v1/sql_query.cpp

+10-7
Original file line numberDiff line numberDiff line change
@@ -1847,7 +1847,7 @@ TNodePtr TSqlQuery::PragmaStatement(const TRule_pragma_stmt& stmt, bool& success
18471847
} else if (normalizedPragma == "disableansiinforemptyornullableitemscollections") {
18481848
Ctx.AnsiInForEmptyOrNullableItemsCollections = false;
18491849
Ctx.IncrementMonCounter("sql_pragma", "DisableAnsiInForEmptyOrNullableItemsCollections");
1850-
} else if (normalizedPragma == "dqengine") {
1850+
} else if (normalizedPragma == "dqengine" || normalizedPragma == "blockengine") {
18511851
Ctx.IncrementMonCounter("sql_pragma", "DqEngine");
18521852
if (values.size() != 1 || !values[0].GetLiteral()
18531853
|| ! (*values[0].GetLiteral() == "disable" || *values[0].GetLiteral() == "auto" || *values[0].GetLiteral() == "force"))
@@ -1856,15 +1856,18 @@ TNodePtr TSqlQuery::PragmaStatement(const TRule_pragma_stmt& stmt, bool& success
18561856
Ctx.IncrementMonCounter("sql_errors", "BadPragmaValue");
18571857
return {};
18581858
}
1859+
const bool isDqEngine = normalizedPragma == "dqengine";
1860+
auto& enable = isDqEngine ? Ctx.DqEngineEnable : Ctx.BlockEngineEnable;
1861+
auto& force = isDqEngine ? Ctx.DqEngineForce : Ctx.BlockEngineForce;
18591862
if (*values[0].GetLiteral() == "disable") {
1860-
Ctx.DqEngineEnable = false;
1861-
Ctx.DqEngineForce = false;
1863+
enable = false;
1864+
force = false;
18621865
} else if (*values[0].GetLiteral() == "force") {
1863-
Ctx.DqEngineEnable = true;
1864-
Ctx.DqEngineForce = true;
1866+
enable = true;
1867+
force = true;
18651868
} else if (*values[0].GetLiteral() == "auto") {
1866-
Ctx.DqEngineEnable = true;
1867-
Ctx.DqEngineForce = false;
1869+
enable = true;
1870+
force = false;
18681871
}
18691872
} else if (normalizedPragma == "ansirankfornullablekeys") {
18701873
Ctx.AnsiRankForNullableKeys = true;

0 commit comments

Comments
 (0)