Skip to content

Stable 24 3 8 analytics #9022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3057f54
Add pragma to grace join runtime node with spilling (#6253)
lll-phill-lll Jul 5, 2024
748aff4
split SpillingEngine and channels spilling (#6406)
lll-phill-lll Jul 8, 2024
17579a7
Prohibit UDF on Flow in case of PartitionByKeys callable. (#6322)
Darych Jul 9, 2024
65507b1
always enable & allow spilling in channels (#6462)
gridnevvvit Jul 11, 2024
5359270
better error handling in spilling actors (#6454)
lll-phill-lll Jul 12, 2024
972df34
get rid of wide fields in wide combiner. Not used in llvm (#6536)
lll-phill-lll Jul 17, 2024
c90319a
add spilling nodes and pragma (#6769)
gridnevvvit Jul 18, 2024
329768e
randomize default spilling root (#6781)
gridnevvvit Jul 18, 2024
48e1454
Added method to enable memory yellow zone by force (#6839)
lll-phill-lll Jul 18, 2024
e410ef9
Tests for wide combiner with spilling (#6880)
lll-phill-lll Jul 22, 2024
3ff3bf4
Fuse FlatMap and ShuffleByKeys. (#6727)
Tony-Romanov Jul 23, 2024
cb9f7f3
WideCombiner with spilling better buffer pass (#7022)
lll-phill-lll Jul 25, 2024
7883ca2
add feature flag to enable spilling nodes (#6895)
gridnevvvit Jul 26, 2024
29ff56c
Remove old spilling tmp files (#7108)
vladl2802 Jul 30, 2024
c2a3e54
Implement LLVM for WideLastCombiner with spilling. (#7304)
Tony-Romanov Jul 31, 2024
6268acb
Fix usage single UDF object from many threads in DQ. (#7436)
Tony-Romanov Aug 7, 2024
ac94f08
support memory pools configurations & tune spilling settings (#7510)
gridnevvvit Aug 7, 2024
3c89009
Enable finalizeByKey by default in kqp (#7559)
lll-phill-lll Aug 8, 2024
2504e8c
Handle spilling errors correctly (#7435)
lll-phill-lll Aug 9, 2024
2027101
fixes in memory pools (#7623)
gridnevvvit Aug 9, 2024
58f3123
Add spilling stats per task (#7505)
vladl2802 Aug 15, 2024
d6345c0
disabled block engine for spilling (#7778)
lll-phill-lll Aug 20, 2024
3439aee
Fix null dereference (#8285)
lll-phill-lll Aug 26, 2024
cd74d10
Better spilling errors (#8812)
lll-phill-lll Sep 5, 2024
9269127
fixed build
lll-phill-lll Sep 15, 2024
1e63d97
fixed test
lll-phill-lll Sep 15, 2024
6d97571
dq canon
lll-phill-lll Sep 15, 2024
341bfce
canon clickbench
lll-phill-lll Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();

TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();

TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");

Expand All @@ -562,6 +564,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
TableServiceConfig.GetEnableSpillingNodes() != enableSpillingNodes ||
TableServiceConfig.GetEnableQueryServiceSpilling() != enableQueryServiceSpilling ||
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||
Expand Down
15 changes: 7 additions & 8 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
, std::shared_ptr<IKqpNodeState> state
, TIntrusivePtr<NRm::TTxState> tx
, TIntrusivePtr<NRm::TTaskState> task
, ui64 limit
, ui64 reasonableSpillingTreshold)
, ui64 limit)
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
, ResourceManager(std::move(resourceManager))
, MemoryPool(memoryPool)
, State(std::move(state))
, Tx(std::move(tx))
, Task(std::move(task))
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
{
}

Expand Down Expand Up @@ -57,7 +55,7 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
}

bool IsReasonableToUseSpilling() const override {
return Tx->GetExtraMemoryAllocatedSize() >= ReasonableSpillingTreshold;
return Task->IsReasonableToStartSpilling();
}

TString MemoryConsumptionDetails() const override {
Expand Down Expand Up @@ -88,7 +86,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
std::atomic<ui64> MinChannelBufferSize = 0;
std::atomic<ui64> ReasonableSpillingTreshold = 0;
std::atomic<ui64> MinMemAllocSize = 8_MB;
std::atomic<ui64> MinMemFreeSize = 32_MB;

Expand All @@ -109,7 +106,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
MinMemAllocSize.store(config.GetMinMemAllocSize());
MinMemFreeSize.store(config.GetMinMemFreeSize());
}
Expand Down Expand Up @@ -164,14 +160,17 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
std::move(args.State),
std::move(args.TxInfo),
std::move(task),
limit,
ReasonableSpillingTreshold.load());
limit);

auto runtimeSettings = args.RuntimeSettings;
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;
runtimeSettings.UseSpilling = args.WithSpilling;
runtimeSettings.StatsMode = args.StatsMode;

if (runtimeSettings.UseSpilling) {
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);
}

if (args.Deadline) {
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ struct IKqpNodeComputeActorFactory {
const TInstant& Deadline;
const bool ShareMailbox;
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;

TComputeStagesWithScan* ComputesByStages = nullptr;
std::shared_ptr<IKqpNodeState> State = nullptr;
TComputeActorSchedulingOptions SchedulingOptions = {};
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ using namespace NYql::NDq;

class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
public:
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback))
, WithSpilling_(withSpilling)
{
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ void TKqpComputeActor::DoBootstrap() {
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
SetTaskRunner(taskRunner);

auto wakeup = [this]{ ContinueExecute(); };
auto wakeupCallback = [this]{ ContinueExecute(); };
auto errorCallback = [this](const TString& error){ SendError(error); };
try {
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback)));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ void TKqpScanComputeActor::DoBootstrap() {
TBase::SetTaskRunner(taskRunner);

auto wakeup = [this] { ContinueExecute(); };
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
auto errorCallback = [this](const TString& error){ SendError(error); };
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback)));

ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
ScanData = &ComputeCtx.GetTableScan(0);
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
request.SetStartAllOrFail(true);
if (UseDataQueryPool) {
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
} else {
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
Expand Down Expand Up @@ -432,8 +433,14 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true);
NYql::NDq::TComputeRuntimeSettings settings;
if (!TxInfo) {
double memoryPoolPercent = 100;
if (UserRequestContext->PoolConfig.has_value()) {
memoryPoolPercent = UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode;
}

TxInfo = MakeIntrusive<NRm::TTxState>(
TxId, TInstant::Now(), ResourceManager_->GetCounters());
TxId, TInstant::Now(), ResourceManager_->GetCounters(),
UserRequestContext->PoolId, memoryPoolPercent, Database);
}

auto startResult = CaFactory_->CreateKqpComputeActor({
Expand All @@ -454,7 +461,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
.StatsMode = GetDqStatsMode(StatsMode),
.Deadline = Deadline,
.ShareMailbox = (computeTasksSize <= 1),
.RlPath = Nothing()
.RlPath = Nothing(),
});

if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&startResult)) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
}

TIntrusivePtr<NRm::TTxState> txInfo = MakeIntrusive<NRm::TTxState>(
txId, TInstant::Now(), ResourceManager_->GetCounters());
txId, TInstant::Now(), ResourceManager_->GetCounters(),
msg.GetSchedulerGroup(), msg.GetMemoryPoolPercent(),
msg.GetDatabase());

const ui32 tasksCount = msg.GetTasks().size();
for (auto& dqTask: *msg.MutableTasks()) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
true, // defaultWatermarksMode
true); // syncActor
} else {
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey(), spillingSettings.IsAggregationSpillingEnabled());
}
if (output) {
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
}

TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) {
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false);
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false, spillingSettings.IsAggregationSpillingEnabled());
DumpAppliedRule("ExpandAggregatePhase", node.Ptr(), output, ctx);
return TExprBase(output);
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -815,10 +815,11 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T

TVector<TExprBase> fakeReads;
auto paramsType = NDq::CollectParameters(programLambda, ctx);
NDq::TSpillingSettings spillingSettings{SessionCtx->Config().GetEnabledSpillingNodes()};
lambda = NDq::BuildProgram(
programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(),
ctx, fakeReads);
ctx, fakeReads, spillingSettings);

NKikimr::NMiniKQL::TProgramBuilder programBuilder(SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry());
Expand Down
31 changes: 29 additions & 2 deletions ydb/core/kqp/provider/yql_kikimr_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/table_service_config.pb.h>
#include <util/generic/size_literals.h>
#include <util/string/split.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

namespace NYql {

Expand All @@ -23,6 +25,22 @@ EOptionalFlag GetOptionalFlagValue(const TMaybe<TType>& flag) {
return EOptionalFlag::Disabled;
}


ui64 ParseEnableSpillingNodes(const TString &v) {
ui64 res = 0;
TVector<TString> vec;
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
for (auto& s: vec) {
if (s.empty()) {
throw yexception() << "Empty value item";
}

auto value = FromString<NDq::EEnabledSpillingNodes>(s);
res |= ui64(value);
}
return res;
}

static inline bool GetFlagValue(const TMaybe<bool>& flag) {
return flag ? flag.GetRef() : false;
}
Expand Down Expand Up @@ -73,6 +91,8 @@ TKikimrConfiguration::TKikimrConfiguration() {

REGISTER_SETTING(*this, OptUseFinalizeByKey);
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
REGISTER_SETTING(*this, EnableSpillingNodes)
.Parser([](const TString& v) { return ParseEnableSpillingNodes(v); });

REGISTER_SETTING(*this, MaxDPccpDPTableSize);

Expand Down Expand Up @@ -126,10 +146,9 @@ bool TKikimrSettings::HasOptEnableOlapProvideComputeSharding() const {
}

bool TKikimrSettings::HasOptUseFinalizeByKey() const {
return GetOptionalFlagValue(OptUseFinalizeByKey.Get()) != EOptionalFlag::Disabled;
return GetFlagValue(OptUseFinalizeByKey.Get().GetOrElse(true)) != EOptionalFlag::Disabled;
}


EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
}
Expand All @@ -151,4 +170,12 @@ TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const {
return std::make_shared<const TKikimrSettings>(*this);
}

void TKikimrConfiguration::SetDefaultEnabledSpillingNodes(const TString& node) {
DefaultEnableSpillingNodes = ParseEnableSpillingNodes(node);
}

ui64 TKikimrConfiguration::GetEnabledSpillingNodes() const {
return EnableSpillingNodes.Get().GetOrElse(DefaultEnableSpillingNodes);
}

}
5 changes: 5 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct TKikimrSettings {
NCommon::TConfSetting<TString, false> OptCardinalityHints;
NCommon::TConfSetting<TString, false> OptJoinAlgoHints;
NCommon::TConfSetting<TString, false> OptJoinOrderHints;
NCommon::TConfSetting<TString, false> OverrideStatistics;

/* Disable optimizer rules */
NCommon::TConfSetting<bool, false> OptDisableTopSort;
Expand Down Expand Up @@ -175,6 +176,10 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableSpillingGenericQuery = false;
ui32 DefaultCostBasedOptimizationLevel = 4;
bool EnableConstantFolding = true;
ui64 DefaultEnableSpillingNodes = 0;

void SetDefaultEnabledSpillingNodes(const TString& node);
ui64 GetEnabledSpillingNodes() const;
};

}
9 changes: 8 additions & 1 deletion ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <library/cpp/monlib/service/pages/templates.h>
#include <library/cpp/resource/resource.h>

#include <util/folder/dirut.h>

namespace NKikimr::NKqp {

Expand Down Expand Up @@ -236,9 +237,15 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext());

if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) {
TString spillingRoot = cfg.GetRoot();
if (spillingRoot.empty()) {
spillingRoot = NYql::NDq::GetTmpSpillingRootForCurrentUser();
MakeDirIfNotExist(spillingRoot);
}

SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(NYql::NDq::CreateDqLocalFileSpillingService(
NYql::NDq::TFileSpillingServiceConfig{
.Root = cfg.GetRoot(),
.Root = spillingRoot,
.MaxTotalSize = cfg.GetMaxTotalSize(),
.MaxFileSize = cfg.GetMaxFileSize(),
.MaxFilePartSize = cfg.GetMaxFilePartSize(),
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
stageProto.SetIsEffectsStage(hasEffects || hasTxTableSink);

auto paramsType = CollectParameters(stage, ctx);
NDq::TSpillingSettings spillingSettings{Config->GetEnabledSpillingNodes()};
auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry,
ctx, {});
ctx, {}, spillingSettings);

auto& programProto = *stageProto.MutableProgram();
programProto.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);
Expand Down
Loading
Loading