Skip to content

Commit 9f0436b

Browse files
authored
Merge 26da7b5 into 4a5e2ba
2 parents 4a5e2ba + 26da7b5 commit 9f0436b

File tree

7 files changed

+168
-24
lines changed

7 files changed

+168
-24
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+12
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
312312

313313
} catch (const yexception& e) {
314314
InternalError(e.what());
315+
} catch (const TMemoryLimitExceededException&) {
316+
RuntimeError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
315317
}
316318
ReportEventElapsedTime();
317319
}
@@ -362,7 +364,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
362364
} catch (const yexception& e) {
363365
CancelProposal(0);
364366
InternalError(e.what());
367+
} catch (const TMemoryLimitExceededException& e) {
368+
CancelProposal(0);
369+
RuntimeError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
365370
}
371+
366372
ReportEventElapsedTime();
367373
}
368374

@@ -944,6 +950,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
944950
}
945951
} catch (const yexception& e) {
946952
InternalError(e.what());
953+
} catch (const TMemoryLimitExceededException) {
954+
if (ReadOnlyTx) {
955+
RuntimeError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
956+
} else {
957+
RuntimeError(Ydb::StatusIds::UNDETERMINED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
958+
}
947959
}
948960
ReportEventElapsedTime();
949961
}

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+8
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
163163
return TActorBootstrapped<TDerived>::SelfId();
164164
}
165165

166+
TString BuildMemoryLimitExceptionMessage() const {
167+
if (Request.TxAlloc) {
168+
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName()
169+
<< ", current limit is " << Request.TxAlloc->Alloc.GetLimit() << " bytes.";
170+
}
171+
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName();
172+
}
173+
166174
void ReportEventElapsedTime() {
167175
if (Stats) {
168176
ui64 elapsedMicros = TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000;

ydb/core/kqp/executer_actor/kqp_literal_executer.cpp

-18
Original file line numberDiff line numberDiff line change
@@ -146,24 +146,6 @@ class TKqpLiteralExecuter {
146146
return;
147147
}
148148

149-
ui64 mkqlMemoryLimit = Request.MkqlMemoryLimit > 0
150-
? Request.MkqlMemoryLimit
151-
: 1_GB;
152-
153-
auto& alloc = Request.TxAlloc->Alloc;
154-
auto rmConfig = GetKqpResourceManager()->GetConfig();
155-
ui64 mkqlInitialLimit = std::min(mkqlMemoryLimit, rmConfig.GetMkqlLightProgramMemoryLimit());
156-
ui64 mkqlMaxLimit = std::max(mkqlMemoryLimit, rmConfig.GetMkqlLightProgramMemoryLimit());
157-
alloc.SetLimit(mkqlInitialLimit);
158-
159-
// TODO: KIKIMR-15350
160-
alloc.Ref().SetIncreaseMemoryLimitCallback([this, &alloc, mkqlMaxLimit](ui64 currentLimit, ui64 required) {
161-
if (required < mkqlMaxLimit) {
162-
LOG_D("Increase memory limit from " << currentLimit << " to " << required);
163-
alloc.SetLimit(required);
164-
}
165-
});
166-
167149
// task runner settings
168150
ComputeCtx = std::make_unique<NMiniKQL::TKqpComputeContextBase>();
169151
RunnerContext = CreateTaskRunnerContext(ComputeCtx.get(), &Request.TxAlloc->TypeEnv);

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
9090

9191
} catch (const yexception& e) {
9292
InternalError(e.what());
93+
} catch (const TMemoryLimitExceededException&) {
94+
RuntimeError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
9395
}
9496
ReportEventElapsedTime();
9597
}
@@ -124,6 +126,8 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
124126
}
125127
} catch (const yexception& e) {
126128
InternalError(e.what());
129+
} catch (const TMemoryLimitExceededException&) {
130+
RuntimeError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
127131
}
128132
ReportEventElapsedTime();
129133
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+49-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <ydb/core/kqp/rm_service/kqp_snapshot_manager.h>
1919
#include <ydb/core/ydb_convert/ydb_convert.h>
2020
#include <ydb/core/tx/schemeshard/schemeshard.h>
21+
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
2122
#include <ydb/public/lib/operation_id/operation_id.h>
2223

2324
#include <ydb/core/util/ulid.h>
@@ -619,6 +620,23 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
619620
QueryState->TxId = UlidGen.Next();
620621
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
621622
AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects);
623+
624+
auto& alloc = QueryState->TxCtx->TxAlloc;
625+
ui64 mkqlInitialLimit = Settings.MkqlInitialMemoryLimit;
626+
627+
const auto& queryLimitsProto = Settings.TableService.GetQueryLimits();
628+
const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits();
629+
ui64 mkqlMaxLimit = phaseLimitsProto.GetComputeNodeMemoryLimitBytes();
630+
mkqlMaxLimit = mkqlMaxLimit ? mkqlMaxLimit : ui64(Settings.MkqlMaxMemoryLimit);
631+
632+
alloc->Alloc.SetLimit(mkqlInitialLimit);
633+
alloc->Alloc.Ref().SetIncreaseMemoryLimitCallback([this, &alloc, mkqlMaxLimit](ui64 currentLimit, ui64 required) {
634+
if (required < mkqlMaxLimit) {
635+
LOG_D("Increase memory limit from " << currentLimit << " to " << required);
636+
alloc->Alloc.SetLimit(required);
637+
}
638+
});
639+
622640
QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
623641
QueryState->TxCtx->SetIsolationLevel(settings);
624642
QueryState->TxCtx->OnBeginQuery();
@@ -748,6 +766,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
748766
}
749767
} catch(const yexception& ex) {
750768
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
769+
} catch(const TMemoryLimitExceededException&) {
770+
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << BuildMemoryLimitExceptionMessage();
751771
}
752772
return true;
753773
}
@@ -2058,6 +2078,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
20582078
ReplyQueryError(ex.Status, ex.what(), ex.Issues);
20592079
} catch (const yexception& ex) {
20602080
InternalError(ex.what());
2081+
} catch (const TMemoryLimitExceededException&) {
2082+
ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR,
2083+
BuildMemoryLimitExceptionMessage());
20612084
}
20622085
}
20632086

@@ -2097,6 +2120,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
20972120
ReplyQueryError(ex.Status, ex.what(), ex.Issues);
20982121
} catch (const yexception& ex) {
20992122
InternalError(ex.what());
2123+
} catch (const TMemoryLimitExceededException&) {
2124+
ReplyQueryError(Ydb::StatusIds::UNDETERMINED,
2125+
BuildMemoryLimitExceptionMessage());
21002126
}
21012127
}
21022128

@@ -2132,14 +2158,24 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
21322158
}
21332159
} catch (const yexception& ex) {
21342160
InternalError(ex.what());
2161+
} catch (const TMemoryLimitExceededException&) {
2162+
ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR,
2163+
BuildMemoryLimitExceptionMessage());
21352164
}
21362165
}
21372166

21382167
STATEFN(FinalCleanupState) {
2139-
switch (ev->GetTypeRewrite()) {
2140-
hFunc(TEvents::TEvGone, HandleFinalCleanup);
2141-
hFunc(TEvents::TEvUndelivered, HandleNoop);
2142-
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
2168+
try {
2169+
switch (ev->GetTypeRewrite()) {
2170+
hFunc(TEvents::TEvGone, HandleFinalCleanup);
2171+
hFunc(TEvents::TEvUndelivered, HandleNoop);
2172+
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
2173+
}
2174+
} catch (const yexception& ex) {
2175+
InternalError(ex.what());
2176+
} catch (const TMemoryLimitExceededException&) {
2177+
ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR,
2178+
BuildMemoryLimitExceptionMessage());
21432179
}
21442180
}
21452181

@@ -2172,6 +2208,15 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
21722208
}
21732209
}
21742210

2211+
TString BuildMemoryLimitExceptionMessage() const {
2212+
if (QueryState && QueryState->TxCtx) {
2213+
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName()
2214+
<< ", current limit is " << QueryState->TxCtx->TxAlloc->Alloc.GetLimit() << " bytes.";
2215+
} else {
2216+
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName();
2217+
}
2218+
}
2219+
21752220
void ProcessTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
21762221
YQL_ENSURE(ev->Get()->Request);
21772222
if (ev->Get()->Request->Cookie < QueryId) {

ydb/core/kqp/session_actor/kqp_session_actor.h

+14-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/core/protos/config.pb.h>
88
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
99

10+
#include <ydb/core/control/immediate_control_board_wrapper.h>
1011
#include <ydb/library/actors/core/actorid.h>
1112

1213
namespace NKikimr::NKqp {
@@ -19,17 +20,28 @@ struct TKqpWorkerSettings {
1920
NKikimrConfig::TTableServiceConfig TableService;
2021
NKikimrConfig::TQueryServiceConfig QueryService;
2122

23+
TControlWrapper MkqlInitialMemoryLimit;
24+
TControlWrapper MkqlMaxMemoryLimit;
25+
2226
TKqpDbCountersPtr DbCounters;
2327

24-
TKqpWorkerSettings(const TString& cluster, const TString& database,
28+
explicit TKqpWorkerSettings(const TString& cluster, const TString& database,
2529
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
2630
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
2731
TKqpDbCountersPtr dbCounters)
2832
: Cluster(cluster)
2933
, Database(database)
3034
, TableService(tableServiceConfig)
3135
, QueryService(queryServiceConfig)
32-
, DbCounters(dbCounters) {}
36+
, MkqlInitialMemoryLimit(2097152, 1, Max<i64>())
37+
, MkqlMaxMemoryLimit(1073741824, 1, Max<i64>())
38+
, DbCounters(dbCounters)
39+
{
40+
AppData()->Icb->RegisterSharedControl(
41+
MkqlInitialMemoryLimit, "KqpSession.MkqlInitialMemoryLimit");
42+
AppData()->Icb->RegisterSharedControl(
43+
MkqlMaxMemoryLimit, "KqpSession.MkqlMaxMemoryLimit");
44+
}
3345
};
3446

3547
IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,

ydb/core/kqp/ut/query/kqp_limits_ut.cpp

+81
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,87 @@ namespace {
1717
}
1818

1919
Y_UNIT_TEST_SUITE(KqpLimits) {
20+
Y_UNIT_TEST(KqpMkqlMemoryLimitException) {
21+
TKikimrRunner kikimr;
22+
CreateLargeTable(kikimr, 10, 10, 1'000'000, 1);
23+
24+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_ERROR);
25+
26+
TControlWrapper mkqlInitialMemoryLimit;
27+
TControlWrapper mkqlMaxMemoryLimit;
28+
29+
mkqlInitialMemoryLimit = kikimr.GetTestServer().GetRuntime()->GetAppData().Icb->RegisterSharedControl(
30+
mkqlInitialMemoryLimit, "KqpSession.MkqlInitialMemoryLimit");
31+
mkqlMaxMemoryLimit = kikimr.GetTestServer().GetRuntime()->GetAppData().Icb->RegisterSharedControl(
32+
mkqlMaxMemoryLimit, "KqpSession.MkqlMaxMemoryLimit");
33+
34+
mkqlInitialMemoryLimit = 1_KB;
35+
mkqlMaxMemoryLimit = 1_KB;
36+
37+
auto db = kikimr.GetTableClient();
38+
auto session = db.CreateSession().GetValueSync().GetSession();
39+
40+
auto result = session.ExecuteDataQuery(Q1_(R"(
41+
SELECT * FROM `/Root/LargeTable`;
42+
)"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
43+
result.GetIssues().PrintTo(Cerr);
44+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
45+
}
46+
47+
Y_UNIT_TEST(LargeParametersAndMkqlFailure) {
48+
auto app = NKikimrConfig::TAppConfig();
49+
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000'000'000);
50+
51+
TKikimrRunner kikimr(app);
52+
CreateLargeTable(kikimr, 0, 0, 0);
53+
54+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_ERROR);
55+
56+
auto db = kikimr.GetTableClient();
57+
auto session = db.CreateSession().GetValueSync().GetSession();
58+
59+
TControlWrapper mkqlInitialMemoryLimit;
60+
TControlWrapper mkqlMaxMemoryLimit;
61+
62+
mkqlInitialMemoryLimit = kikimr.GetTestServer().GetRuntime()->GetAppData().Icb->RegisterSharedControl(
63+
mkqlInitialMemoryLimit, "KqpSession.MkqlInitialMemoryLimit");
64+
mkqlMaxMemoryLimit = kikimr.GetTestServer().GetRuntime()->GetAppData().Icb->RegisterSharedControl(
65+
mkqlMaxMemoryLimit, "KqpSession.MkqlMaxMemoryLimit");
66+
67+
68+
mkqlInitialMemoryLimit = 1_KB;
69+
mkqlMaxMemoryLimit = 1_KB;
70+
71+
auto paramsBuilder = db.GetParamsBuilder();
72+
auto& rowsParam = paramsBuilder.AddParam("$rows");
73+
74+
rowsParam.BeginList();
75+
for (ui32 i = 0; i < 100; ++i) {
76+
rowsParam.AddListItem()
77+
.BeginStruct()
78+
.AddMember("Key")
79+
.OptionalUint64(i)
80+
.AddMember("KeyText")
81+
.OptionalString(TString(5000, '0' + i % 10))
82+
.AddMember("Data")
83+
.OptionalInt64(i)
84+
.AddMember("DataText")
85+
.OptionalString(TString(16, '0' + (i + 1) % 10))
86+
.EndStruct();
87+
}
88+
rowsParam.EndList();
89+
rowsParam.Build();
90+
91+
auto result = session.ExecuteDataQuery(Q1_(R"(
92+
DECLARE $rows AS List<Struct<Key: Uint64?, KeyText: String?, Data: Int64?, DataText: String?>>;
93+
94+
UPSERT INTO `/Root/LargeTable`
95+
SELECT * FROM AS_TABLE($rows);
96+
)"), TTxControl::BeginTx().CommitTx(), paramsBuilder.Build()).ExtractValueSync();
97+
result.GetIssues().PrintTo(Cerr);
98+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
99+
}
100+
20101
Y_UNIT_TEST(DatashardProgramSize) {
21102
auto app = NKikimrConfig::TAppConfig();
22103
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000'000'000);

0 commit comments

Comments
 (0)