diff --git a/ydb/core/kqp/common/simple/temp_tables.cpp b/ydb/core/kqp/common/simple/temp_tables.cpp index 3f7645d2de8a..4beb776d1dc5 100644 --- a/ydb/core/kqp/common/simple/temp_tables.cpp +++ b/ydb/core/kqp/common/simple/temp_tables.cpp @@ -2,4 +2,21 @@ namespace NKikimr::NKqp { +THashMap::const_iterator +TKqpTempTablesState::FindInfo(const std::string_view& path, bool withSessionId) const { + if (!withSessionId) { + return TempTables.find(path); + } + + if (path.size() < SessionId.size()) { + return TempTables.end(); + } + size_t pos = path.size() - SessionId.size(); + if (path.substr(pos) != SessionId) { + return TempTables.end(); + } + + return TempTables.find(path.substr(0, pos)); +} + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/temp_tables.h b/ydb/core/kqp/common/simple/temp_tables.h index 2f3bb049e2aa..320e9ce82940 100644 --- a/ydb/core/kqp/common/simple/temp_tables.h +++ b/ydb/core/kqp/common/simple/temp_tables.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -14,14 +15,15 @@ struct TKqpTempTablesState { struct TTempTableInfo { TString Name; TString WorkingDir; - TString Database; TIntrusiveConstPtr UserToken; - TString Cluster; }; - std::optional SessionId; - THashMap, TTempTableInfo> TempTables; + TString SessionId; + THashMap TempTables; using TConstPtr = std::shared_ptr; + + THashMap::const_iterator + FindInfo(const std::string_view& path, bool withSessionId = false) const; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 6d6414fca96c..34b3074d9bca 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -567,6 +567,9 @@ class TKqpCompileService : public TActorBootstrapped { Counters->ReportCompileRequestGet(dbCounters); auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache); + if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { + compileResult = nullptr; + } if (compileResult) { Y_ENSURE(compileResult->Query); if (compileResult->Query->UserSid == userSid) { @@ -610,6 +613,10 @@ class TKqpCompileService : public TActorBootstrapped { } auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache); + if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { + compileResult = nullptr; + } + if (compileResult) { Counters->ReportQueryCacheHit(dbCounters, true); @@ -672,7 +679,11 @@ class TKqpCompileService : public TActorBootstrapped { auto dbCounters = request.DbCounters; Counters->ReportRecompileRequestGet(dbCounters); - auto compileResult = QueryCache.FindByUid(request.Uid, false); + TKqpCompileResult::TConstPtr compileResult = QueryCache.FindByUid(request.Uid, false); + if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { + compileResult = nullptr; + } + if (compileResult || request.Query) { Counters->ReportCompileRequestCompile(dbCounters); @@ -736,19 +747,12 @@ class TKqpCompileService : public TActorBootstrapped { bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache; + bool hasTempTablesNameClashes = HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState, true); + try { if (compileResult->Status == Ydb::StatusIds::SUCCESS) { - if (QueryCache.FindByUid(compileResult->Uid, false)) { - QueryCache.Replace(compileResult); - } else if (keepInCache) { - if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) { - Counters->CompileQueryCacheEvicted->Inc(); - } - if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) { - if (InsertPreparingQuery(compileResult, compileRequest.KeepInCache)) { - Counters->CompileQueryCacheEvicted->Inc(); - }; - } + if (!hasTempTablesNameClashes) { + UpdateQueryCache(compileResult, keepInCache); } if (ev->Get()->ReplayMessage) { @@ -762,8 +766,10 @@ class TKqpCompileService : public TActorBootstrapped { request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt)); } } else { - if (QueryCache.FindByUid(compileResult->Uid, false)) { - QueryCache.EraseByUid(compileResult->Uid); + if (!hasTempTablesNameClashes) { + if (QueryCache.FindByUid(compileResult->Uid, false)) { + QueryCache.EraseByUid(compileResult->Uid); + } } } @@ -814,12 +820,43 @@ class TKqpCompileService : public TActorBootstrapped { StartCheckQueriesTtlTimer(); } + bool HasTempTablesNameClashes( + TKqpCompileResult::TConstPtr compileResult, + TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId = false) { + if (!compileResult) { + return false; + } + if (!compileResult->PreparedQuery) { + return false; + } + + return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId); + } + + void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache) { + if (QueryCache.FindByUid(compileResult->Uid, false)) { + QueryCache.Replace(compileResult); + } else if (keepInCache) { + if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) { + Counters->CompileQueryCacheEvicted->Inc(); + } + if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) { + if (InsertPreparingQuery(compileResult, true)) { + Counters->CompileQueryCacheEvicted->Inc(); + }; + } + } + } + void Handle(TEvKqp::TEvParseResponse::TPtr& ev, const TActorContext& ctx) { auto& parseResult = ev->Get()->AstResult; auto& query = ev->Get()->Query; auto compileRequest = RequestsQueue.FinishActiveRequest(query); if (parseResult && parseResult->Ast->IsOk()) { auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache); + if (HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) { + compileResult = nullptr; + } if (compileResult) { Counters->ReportQueryCacheHit(compileRequest.DbCounters, true); diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 450229c2f7b2..aa5028518bed 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -113,11 +113,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { } case NKqpProto::TKqpSchemeOperation::kDropTable: { - auto modifyScheme = schemeOp.GetDropTable(); - if (Temporary) { - auto* dropTable = modifyScheme.MutableDrop(); - dropTable->SetName(dropTable->GetName() + SessionId); - } + const auto& modifyScheme = schemeOp.GetDropTable(); ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); break; } diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index c8a7f7038cde..e368f3f40ad9 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -30,16 +30,16 @@ struct NavigateEntryResult { std::optional QueryName; }; -NavigateEntryResult CreateNavigateEntry(const TString& cluster, const TString& path, - const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) { +NavigateEntryResult CreateNavigateEntry(const TString& path, + const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) { TNavigate::TEntry entry; TString currentPath = path; std::optional queryName = std::nullopt; if (tempTablesState) { - auto tempTablesIt = tempTablesState->TempTables.find(std::make_pair(cluster, currentPath)); - if (tempTablesState->SessionId && tempTablesIt != tempTablesState->TempTables.end()) { + auto tempTablesInfoIt = tempTablesState->FindInfo(currentPath, false); + if (tempTablesInfoIt != tempTablesState->TempTables.end()) { queryName = currentPath; - currentPath = currentPath + *tempTablesState->SessionId; + currentPath = currentPath + tempTablesState->SessionId; } } entry.Path = SplitPath(currentPath); @@ -50,10 +50,8 @@ NavigateEntryResult CreateNavigateEntry(const TString& cluster, const TString& p return {entry, currentPath, queryName}; } -NavigateEntryResult CreateNavigateEntry(const TString& cluster, - const std::pair& pair, +NavigateEntryResult CreateNavigateEntry(const std::pair& pair, const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) { - Y_UNUSED(cluster); Y_UNUSED(tempTablesState); TNavigate::TEntry entry; @@ -701,8 +699,8 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta const auto externalEntryItem = CreateNavigateExternalEntry(id, settings.WithExternalDatasources_); Y_ABORT_UNLESS(!settings.WithExternalDatasources_ || externalEntryItem, "External data source must be resolved using path only"); - auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(cluster, - id, settings, TempTablesState); + auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(id, + settings, TempTablesState); const auto entry = resNavigate.Entry; const auto queryName = resNavigate.QueryName; const auto externalEntry = settings.WithExternalDatasources_ ? std::optional{} : externalEntryItem; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index b432ce8484df..28965c15b825 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -121,11 +121,13 @@ struct TKikimrData { const TKikimrTableDescription* TKikimrTablesData::EnsureTableExists(const TString& cluster, const TString& table, TPositionHandle pos, TExprContext& ctx) const { - auto tempTable = TempTables.FindPtr(table); - auto tablePath = table; - if (tempTable) { - tablePath = *tempTable; + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); + + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; + } } auto desc = Tables.FindPtr(std::make_pair(cluster, tablePath)); @@ -141,11 +143,13 @@ const TKikimrTableDescription* TKikimrTablesData::EnsureTableExists(const TStrin } TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster, const TString& database, const TString& table, ETableType tableType) { - auto tempTable = TempTables.FindPtr(table); - auto tablePath = table; - if (tempTable) { - tablePath = *tempTable; + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); + + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; + } } if (!Tables.FindPtr(std::make_pair(cluster, tablePath))) { @@ -165,11 +169,13 @@ TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster } TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, const TString& table) { - auto tempTable = TempTables.FindPtr(table); - auto tablePath = table; - if (tempTable) { - tablePath = *tempTable; + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); + + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; + } } auto desc = Tables.FindPtr(std::make_pair(cluster, tablePath)); @@ -181,12 +187,13 @@ TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, con const TKikimrTableDescription& TKikimrTablesData::ExistingTable(const TStringBuf& cluster, const TStringBuf& table) const { - auto tempTable = TempTables.FindPtr(table); - auto tablePath = table; + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); - if (tempTable) { - tablePath = *tempTable; + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; + } } auto desc = Tables.FindPtr(std::make_pair(TString(cluster), TString(tablePath))); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 2798b50e5efb..abcabe0c416e 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -200,16 +200,12 @@ class TKikimrTablesData : public TThrRefBase { } void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) { - if (tempTablesState) { - for (const auto& [path, info] : tempTablesState->TempTables) { - TempTables[path.second + *tempTablesState->SessionId] = path.second; - } - } + TempTablesState = std::move(tempTablesState); } private: THashMap, TKikimrTableDescription> Tables; - THashMap TempTables; + NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState; }; enum class TYdbOperation : ui32 { @@ -288,11 +284,7 @@ class TKikimrTransactionContextBase : public TThrRefBase { } void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) { - if (tempTablesState) { - for (const auto& [path, info] : tempTablesState->TempTables) { - TempTables[path.second] = path.second + *tempTablesState->SessionId; - } - } + TempTablesState = std::move(tempTablesState); } template @@ -330,17 +322,16 @@ class TKikimrTransactionContextBase : public TThrRefBase { } for (const auto& op : operations) { - const auto& table = [&]() -> const TString& { - const auto tempTable = TempTables.FindPtr(op.GetTable()); - if (tempTable) { - return *tempTable; - } else { - return op.GetTable(); - } - }(); - const auto newOp = TYdbOperation(op.GetOperation()); + auto table = op.GetTable(); + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, false); + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + table = tempTableInfoIt->first + TempTablesState->SessionId; + } + } + const auto info = tableInfoMap.FindPtr(table); if (!info) { TString message = TStringBuilder() @@ -450,7 +441,7 @@ class TKikimrTransactionContextBase : public TThrRefBase { THashMap TableOperations; THashMap TableByIdMap; TMaybe EffectiveIsolationLevel; - THashMap TempTables; + NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState; bool Readonly = false; bool Invalidated = false; bool Closed = false; @@ -535,7 +526,7 @@ class TKikimrSessionContext : public TThrRefBase { if (TxCtx) { TxCtx->SetTempTables(tempTablesState); } - TempTablesState = tempTablesState; + TempTablesState = std::move(tempTablesState); } const TIntrusiveConstPtr& GetUserToken() const { diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp index 1fe60ca3ec8f..5bc367698160 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp +++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp @@ -1,5 +1,6 @@ #include "kqp_prepared_query.h" +#include #include #include #include @@ -106,6 +107,47 @@ bool TKqpPhyTxHolder::IsLiteralTx() const { return LiteralTx; } +std::optional>> +TKqpPhyTxHolder::GetSchemeOpTempTablePath() const { + if (GetType() != NKqpProto::TKqpPhyTx::TYPE_SCHEME) { + return std::nullopt; + } + auto& schemeOperation = GetSchemeOperation(); + switch (schemeOperation.GetOperationCase()) { + case NKqpProto::TKqpSchemeOperation::kCreateTable: { + const auto& modifyScheme = schemeOperation.GetCreateTable(); + const NKikimrSchemeOp::TTableDescription* tableDesc = nullptr; + switch (modifyScheme.GetOperationType()) { + case NKikimrSchemeOp::ESchemeOpCreateTable: { + tableDesc = &modifyScheme.GetCreateTable(); + break; + } + case NKikimrSchemeOp::ESchemeOpCreateIndexedTable: { + tableDesc = &modifyScheme.GetCreateIndexedTable().GetTableDescription(); + break; + } + default: + return std::nullopt; + } + if (tableDesc->HasTemporary()) { + if (tableDesc->GetTemporary()) { + return {{true, {modifyScheme.GetWorkingDir(), tableDesc->GetName()}}}; + } + } + break; + } + case NKqpProto::TKqpSchemeOperation::kDropTable: { + auto modifyScheme = schemeOperation.GetDropTable(); + auto* dropTable = modifyScheme.MutableDrop(); + + return {{false, {modifyScheme.GetWorkingDir(), dropTable->GetName()}}}; + } + default: + return std::nullopt; + } + return std::nullopt; +} + const NKikimr::NKqp::TStagePredictor& TKqpPhyTxHolder::GetCalculationPredictor(const size_t stageIdx) const { YQL_ENSURE(stageIdx < Predictors.size(), "incorrect stage idx for predictor"); return Predictors[stageIdx]; @@ -226,6 +268,39 @@ void TPreparedQueryHolder::FillTables(const google::protobuf::RepeatedPtrField< } } +bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId) const { + if (!tempTablesState) { + return false; + } + for (const auto& table: QueryTables) { + auto infoIt = tempTablesState->FindInfo(table, withSessionId); + if (infoIt != tempTablesState->TempTables.end()) { + return true; + } + } + + + if (withSessionId) { + for (const auto& tx: Transactions) { + auto optPath = tx->GetSchemeOpTempTablePath(); + if (!optPath) { + continue; + } else { + const auto& [isCreate, path] = *optPath; + if (isCreate) { + return true; + } else { + auto infoIt = tempTablesState->FindInfo(JoinPath({path.first, path.second}), withSessionId); + if (infoIt != tempTablesState->TempTables.end()) { + return true; + } + } + } + } + } + return false; +} + const TKqpPhyTxHolder::TConstPtr& TPreparedQueryHolder::GetPhyTx(ui32 txId) const { YQL_ENSURE(txId < Transactions.size()); return Transactions[txId]; diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h index 719c14594eff..e43a49c83af5 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.h +++ b/ydb/core/kqp/query_data/kqp_prepared_query.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -115,6 +116,9 @@ class TKqpPhyTxHolder { const std::shared_ptr& alloc, TIntrusivePtr tableConstInfoById); bool IsLiteralTx() const; + + std::optional>> + GetSchemeOpTempTablePath() const; }; class TLlvmSettings { @@ -187,6 +191,8 @@ class TPreparedQueryHolder { void FillTable(const NKqpProto::TKqpPhyTable& phyTable); void FillTables(const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyStage>& stages); + + bool HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId) const; }; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 133634d3a49b..1ff33679fd32 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -180,8 +180,11 @@ class TKqpSessionActor : public TActorBootstrapped { FillSettings.Format = IDataProvider::EResultFormat::Custom; FillSettings.FormatDetails = TString(KikimrMkqlProtoFormat); - TempTablesState.SessionId = TryDecodeYdbSessionId(SessionId); - LOG_D("Create session actor with id " << *TempTablesState.SessionId); + auto optSessionId = TryDecodeYdbSessionId(SessionId); + YQL_ENSURE(optSessionId, "Can't decode ydb session Id"); + + TempTablesState.SessionId = *optSessionId; + LOG_D("Create session actor with id " << TempTablesState.SessionId); } void Bootstrap() { @@ -1063,7 +1066,7 @@ class TKqpSessionActor : public TActorBootstrapped { bool temporary = GetTemporaryTableInfo(tx).has_value(); auto executerActor = CreateKqpSchemeExecuter(tx, QueryState->GetType(), SelfId(), requestType, Settings.Database, userToken, - temporary, *TempTablesState.SessionId, QueryState->UserRequestContext); + temporary, TempTablesState.SessionId, QueryState->UserRequestContext); ExecuterId = RegisterWithSameMailbox(executerActor); } @@ -1160,39 +1163,27 @@ class TKqpSessionActor : public TActorBootstrapped { } } - std::optional GetTemporaryTableInfo(TKqpPhyTxHolder::TConstPtr tx) { + std::optional>> + GetTemporaryTableInfo(TKqpPhyTxHolder::TConstPtr tx) { if (!tx) { return std::nullopt; } - const auto& schemeOperation = tx->GetSchemeOperation(); - switch (schemeOperation.GetOperationCase()) { - case NKqpProto::TKqpSchemeOperation::kCreateTable: { - const auto& modifyScheme = schemeOperation.GetCreateTable(); - const NKikimrSchemeOp::TTableDescription* tableDesc = nullptr; - switch (modifyScheme.GetOperationType()) { - case NKikimrSchemeOp::ESchemeOpCreateTable: { - tableDesc = &modifyScheme.GetCreateTable(); - break; - } - case NKikimrSchemeOp::ESchemeOpCreateIndexedTable: { - tableDesc = &modifyScheme.GetCreateIndexedTable().GetTableDescription(); - break; - } - default: - YQL_ENSURE(false, "Unexpected operation type"); - } - auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr(); - if (tableDesc->HasTemporary()) { - if (tableDesc->GetTemporary()) { - return {{tableDesc->GetName(), modifyScheme.GetWorkingDir(), Settings.Cluster, userToken, Settings.Database}}; - } - } - break; - } - default: - return std::nullopt; + auto optPath = tx->GetSchemeOpTempTablePath(); + if (!optPath) { + return std::nullopt; } - return std::nullopt; + const auto& [isCreate, path] = *optPath; + if (isCreate) { + auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr(); + return {{true, {JoinPath({path.first, path.second}), {path.second, path.first, userToken}}}}; + } + + auto it = TempTablesState.FindInfo(JoinPath({path.first, path.second}), true); + if (it == TempTablesState.TempTables.end()) { + return std::nullopt; + } + + return {{false, {it->first, {}}}}; } void UpdateTempTablesState() { @@ -1203,8 +1194,14 @@ class TKqpSessionActor : public TActorBootstrapped { if (!tx) { return; } - if (auto tempTableInfo = GetTemporaryTableInfo(tx)) { - TempTablesState.TempTables[std::make_pair(tempTableInfo->Database, JoinPath({tempTableInfo->WorkingDir, tempTableInfo->Name}))] = std::move(*tempTableInfo); + auto optInfo = GetTemporaryTableInfo(tx); + if (optInfo) { + auto [isCreate, info] = *optInfo; + if (isCreate) { + TempTablesState.TempTables[info.first] = info.second; + } else { + TempTablesState.TempTables.erase(info.first); + } QueryState->UpdateTempTablesState(TempTablesState); } } @@ -1882,7 +1879,8 @@ class TKqpSessionActor : public TActorBootstrapped { Become(&TKqpSessionActor::FinalCleanupState); LOG_D("Cleanup temp tables: " << TempTablesState.TempTables.size()); - auto tempTablesManager = CreateKqpTempTablesManager(std::move(TempTablesState), SelfId()); + auto tempTablesManager = CreateKqpTempTablesManager( + std::move(TempTablesState), SelfId(), Settings.Database); RegisterWithSameMailbox(tempTablesManager); return; } else { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h index 77920f816486..68214d534856 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.h +++ b/ydb/core/kqp/session_actor/kqp_session_actor.h @@ -41,6 +41,7 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig ); -IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target); +IActor* CreateKqpTempTablesManager( + TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp b/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp index 17e6a78b142b..d7d13ad49828 100644 --- a/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp +++ b/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp @@ -39,9 +39,10 @@ class TKqpTempTablesManager : public TActorBootstrapped { return NKikimrServices::TActivity::KQP_SESSION_ACTOR; } - TKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target) + TKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database) : TempTablesState(std::move(tempTablesState)) , Target(target) + , Database(database) {} void Bootstrap() { @@ -51,7 +52,7 @@ class TKqpTempTablesManager : public TActorBootstrapped { auto ev = MakeHolder(); auto& record = ev->Record; - record.SetDatabaseName(info.Database); + record.SetDatabaseName(Database); if (info.UserToken) { record.SetUserToken(info.UserToken->GetSerializedToken()); } @@ -60,9 +61,7 @@ class TKqpTempTablesManager : public TActorBootstrapped { modifyScheme->SetWorkingDir(info.WorkingDir); modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropTable); auto* drop = modifyScheme->MutableDrop(); - if (TempTablesState.SessionId) { - drop->SetName(info.Name + *TempTablesState.SessionId); - } + drop->SetName(info.Name + TempTablesState.SessionId); auto promise = NewPromise(); IActor* requestHandler = new TSchemeOpRequestHandler(ev.Release(), promise, true); @@ -107,14 +106,15 @@ class TKqpTempTablesManager : public TActorBootstrapped { private: TKqpTempTablesState TempTablesState; const TActorId Target; + const TString Database; ui32 ResultsCount = 0; }; } // namespace -IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target) +IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database) { - return new TKqpTempTablesManager(tempTablesState, target); + return new TKqpTempTablesManager(tempTablesState, target, database); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 5f32e98be209..253afa09a87e 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -2251,6 +2251,224 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + Y_UNIT_TEST(TempTablesWithCache) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr( + serverSettings.SetWithSampleTables(false).SetEnableTempTables(true)); + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); + auto client = kikimr.GetQueryClient(); + + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .Syntax(NYdb::NQuery::ESyntax::Pg) + .StatsMode(NYdb::NQuery::EStatsMode::Basic); + { + auto session = client.GetSession().GetValueSync().GetSession(); + auto id = session.GetId(); + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE PgTemp ( + key int2 PRIMARY KEY, + value int2))"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + DROP TABLE PgTemp; + )"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE PgTemp ( + key int2 PRIMARY KEY, + value int2))"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + + auto resultInsert = session.ExecuteQuery(R"( + INSERT INTO PgTemp VALUES(1, 1); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C( + resultInsert.GetStatus(), EStatus::SUCCESS, resultInsert.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE SimpleTable ( + key int2 PRIMARY KEY, + value int2))"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM SimpleTable; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TEMP TABLE PgTemp ( + key int2 PRIMARY KEY, + value int2))"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + + auto resultInsert = session.ExecuteQuery(R"( + INSERT INTO PgTemp VALUES(2, 2); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C( + resultInsert.GetStatus(), EStatus::SUCCESS, resultInsert.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["2";"2"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM SimpleTable; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + } + + { + const auto query = Q_(R"( + --!syntax_pg + DROP TABLE PgTemp; + )"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + DROP TABLE PgTemp; + )"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk); + + UNIT_ASSERT(allDoneOk); + } + + { + const auto querySelect = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto resultSelect = client.ExecuteQuery( + querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT(!resultSelect.IsSuccess()); + } + } + Y_UNIT_TEST(ValuesInsert) { TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); auto testSingleType = [&kikimr] (const TPgTypeTestSpec& spec) {