|
18 | 18 | #include <ydb/core/kqp/rm_service/kqp_snapshot_manager.h>
|
19 | 19 | #include <ydb/core/ydb_convert/ydb_convert.h>
|
20 | 20 | #include <ydb/core/tx/schemeshard/schemeshard.h>
|
| 21 | +#include <ydb/core/kqp/rm_service/kqp_rm_service.h> |
21 | 22 | #include <ydb/public/lib/operation_id/operation_id.h>
|
22 | 23 |
|
23 | 24 | #include <ydb/core/util/ulid.h>
|
@@ -619,6 +620,23 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
|
619 | 620 | QueryState->TxId = UlidGen.Next();
|
620 | 621 | QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
|
621 | 622 | AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects);
|
| 623 | + |
| 624 | + auto& alloc = QueryState->TxCtx->TxAlloc; |
| 625 | + ui64 mkqlInitialLimit = Settings.MkqlInitialMemoryLimit; |
| 626 | + |
| 627 | + const auto& queryLimitsProto = Settings.TableService.GetQueryLimits(); |
| 628 | + const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits(); |
| 629 | + ui64 mkqlMaxLimit = phaseLimitsProto.GetComputeNodeMemoryLimitBytes(); |
| 630 | + mkqlMaxLimit = mkqlMaxLimit ? mkqlMaxLimit : ui64(Settings.MkqlMaxMemoryLimit); |
| 631 | + |
| 632 | + alloc->Alloc.SetLimit(mkqlInitialLimit); |
| 633 | + alloc->Alloc.Ref().SetIncreaseMemoryLimitCallback([this, &alloc, mkqlMaxLimit](ui64 currentLimit, ui64 required) { |
| 634 | + if (required < mkqlMaxLimit) { |
| 635 | + LOG_D("Increase memory limit from " << currentLimit << " to " << required); |
| 636 | + alloc->Alloc.SetLimit(required); |
| 637 | + } |
| 638 | + }); |
| 639 | + |
622 | 640 | QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
|
623 | 641 | QueryState->TxCtx->SetIsolationLevel(settings);
|
624 | 642 | QueryState->TxCtx->OnBeginQuery();
|
@@ -748,6 +766,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
|
748 | 766 | }
|
749 | 767 | } catch(const yexception& ex) {
|
750 | 768 | ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
|
| 769 | + } catch(const TMemoryLimitExceededException&) { |
| 770 | + ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << BuildMemoryLimitExceptionMessage(); |
751 | 771 | }
|
752 | 772 | return true;
|
753 | 773 | }
|
@@ -2051,6 +2071,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
|
2051 | 2071 | ReplyQueryError(ex.Status, ex.what(), ex.Issues);
|
2052 | 2072 | } catch (const yexception& ex) {
|
2053 | 2073 | InternalError(ex.what());
|
| 2074 | + } catch (const TMemoryLimitExceededException&) { |
| 2075 | + ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR, |
| 2076 | + BuildMemoryLimitExceptionMessage()); |
2054 | 2077 | }
|
2055 | 2078 | }
|
2056 | 2079 |
|
@@ -2090,6 +2113,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
|
2090 | 2113 | ReplyQueryError(ex.Status, ex.what(), ex.Issues);
|
2091 | 2114 | } catch (const yexception& ex) {
|
2092 | 2115 | InternalError(ex.what());
|
| 2116 | + } catch (const TMemoryLimitExceededException&) { |
| 2117 | + ReplyQueryError(Ydb::StatusIds::UNDETERMINED, |
| 2118 | + BuildMemoryLimitExceptionMessage()); |
2093 | 2119 | }
|
2094 | 2120 | }
|
2095 | 2121 |
|
@@ -2125,14 +2151,24 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
|
2125 | 2151 | }
|
2126 | 2152 | } catch (const yexception& ex) {
|
2127 | 2153 | InternalError(ex.what());
|
| 2154 | + } catch (const TMemoryLimitExceededException&) { |
| 2155 | + ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR, |
| 2156 | + BuildMemoryLimitExceptionMessage()); |
2128 | 2157 | }
|
2129 | 2158 | }
|
2130 | 2159 |
|
2131 | 2160 | STATEFN(FinalCleanupState) {
|
2132 |
| - switch (ev->GetTypeRewrite()) { |
2133 |
| - hFunc(TEvents::TEvGone, HandleFinalCleanup); |
2134 |
| - hFunc(TEvents::TEvUndelivered, HandleNoop); |
2135 |
| - hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); |
| 2161 | + try { |
| 2162 | + switch (ev->GetTypeRewrite()) { |
| 2163 | + hFunc(TEvents::TEvGone, HandleFinalCleanup); |
| 2164 | + hFunc(TEvents::TEvUndelivered, HandleNoop); |
| 2165 | + hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle); |
| 2166 | + } |
| 2167 | + } catch (const yexception& ex) { |
| 2168 | + InternalError(ex.what()); |
| 2169 | + } catch (const TMemoryLimitExceededException&) { |
| 2170 | + ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR, |
| 2171 | + BuildMemoryLimitExceptionMessage()); |
2136 | 2172 | }
|
2137 | 2173 | }
|
2138 | 2174 |
|
@@ -2165,6 +2201,15 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
|
2165 | 2201 | }
|
2166 | 2202 | }
|
2167 | 2203 |
|
| 2204 | + TString BuildMemoryLimitExceptionMessage() const { |
| 2205 | + if (QueryState && QueryState->TxCtx) { |
| 2206 | + return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName() |
| 2207 | + << ", current limit is " << QueryState->TxCtx->TxAlloc->Alloc.GetLimit() << " bytes."; |
| 2208 | + } else { |
| 2209 | + return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName(); |
| 2210 | + } |
| 2211 | + } |
| 2212 | + |
2168 | 2213 | void ProcessTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
|
2169 | 2214 | YQL_ENSURE(ev->Get()->Request);
|
2170 | 2215 | if (ev->Get()->Request->Cookie < QueryId) {
|
|
0 commit comments