Skip to content

KIKIMR-20740: QueryCache with temp tables #1053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions ydb/core/kqp/common/simple/temp_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,21 @@

namespace NKikimr::NKqp {

THashMap<TString, TKqpTempTablesState::TTempTableInfo>::const_iterator
TKqpTempTablesState::FindInfo(const std::string_view& path, bool withSessionId) const {
if (!withSessionId) {
return TempTables.find(path);
}

if (path.size() < SessionId.size()) {
return TempTables.end();
}
size_t pos = path.size() - SessionId.size();
if (path.substr(pos) != SessionId) {
return TempTables.end();
}

return TempTables.find(path.substr(0, pos));
}

} // namespace NKikimr::NKqp
10 changes: 6 additions & 4 deletions ydb/core/kqp/common/simple/temp_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/library/aclib/aclib.h>

#include <optional>
#include <string_view>

#include <util/generic/fwd.h>
#include <util/generic/hash.h>
Expand All @@ -14,14 +15,15 @@ struct TKqpTempTablesState {
struct TTempTableInfo {
TString Name;
TString WorkingDir;
TString Database;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TString Cluster;
};
std::optional<TString> SessionId;
THashMap<std::pair<TString, TString>, TTempTableInfo> TempTables;
TString SessionId;
THashMap<TString, TTempTableInfo> TempTables;

using TConstPtr = std::shared_ptr<const TKqpTempTablesState>;

THashMap<TString, TTempTableInfo>::const_iterator
FindInfo(const std::string_view& path, bool withSessionId = false) const;
};

} // namespace NKikimr::NKqp
65 changes: 51 additions & 14 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,9 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
Counters->ReportCompileRequestGet(dbCounters);

auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache);
if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
compileResult = nullptr;
}
if (compileResult) {
Y_ENSURE(compileResult->Query);
if (compileResult->Query->UserSid == userSid) {
Expand Down Expand Up @@ -610,6 +613,10 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache);
if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
compileResult = nullptr;
}

if (compileResult) {
Counters->ReportQueryCacheHit(dbCounters, true);

Expand Down Expand Up @@ -672,7 +679,11 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
auto dbCounters = request.DbCounters;
Counters->ReportRecompileRequestGet(dbCounters);

auto compileResult = QueryCache.FindByUid(request.Uid, false);
TKqpCompileResult::TConstPtr compileResult = QueryCache.FindByUid(request.Uid, false);
if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) {
compileResult = nullptr;
}

if (compileResult || request.Query) {
Counters->ReportCompileRequestCompile(dbCounters);

Expand Down Expand Up @@ -736,19 +747,12 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache;

bool hasTempTablesNameClashes = HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState, true);

try {
if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.Replace(compileResult);
} else if (keepInCache) {
if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) {
Counters->CompileQueryCacheEvicted->Inc();
}
if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) {
if (InsertPreparingQuery(compileResult, compileRequest.KeepInCache)) {
Counters->CompileQueryCacheEvicted->Inc();
};
}
if (!hasTempTablesNameClashes) {
UpdateQueryCache(compileResult, keepInCache);
}

if (ev->Get()->ReplayMessage) {
Expand All @@ -762,8 +766,10 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
}
} else {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.EraseByUid(compileResult->Uid);
if (!hasTempTablesNameClashes) {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.EraseByUid(compileResult->Uid);
}
}
}

Expand Down Expand Up @@ -814,12 +820,43 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
StartCheckQueriesTtlTimer();
}

bool HasTempTablesNameClashes(
TKqpCompileResult::TConstPtr compileResult,
TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId = false) {
if (!compileResult) {
return false;
}
if (!compileResult->PreparedQuery) {
return false;
}

return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId);
}

void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache) {
if (QueryCache.FindByUid(compileResult->Uid, false)) {
QueryCache.Replace(compileResult);
} else if (keepInCache) {
if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) {
Counters->CompileQueryCacheEvicted->Inc();
}
if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) {
if (InsertPreparingQuery(compileResult, true)) {
Counters->CompileQueryCacheEvicted->Inc();
};
}
}
}

void Handle(TEvKqp::TEvParseResponse::TPtr& ev, const TActorContext& ctx) {
auto& parseResult = ev->Get()->AstResult;
auto& query = ev->Get()->Query;
auto compileRequest = RequestsQueue.FinishActiveRequest(query);
if (parseResult && parseResult->Ast->IsOk()) {
auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache);
if (HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) {
compileResult = nullptr;
}
if (compileResult) {
Counters->ReportQueryCacheHit(compileRequest.DbCounters, true);

Expand Down
6 changes: 1 addition & 5 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
}

case NKqpProto::TKqpSchemeOperation::kDropTable: {
auto modifyScheme = schemeOp.GetDropTable();
if (Temporary) {
auto* dropTable = modifyScheme.MutableDrop();
dropTable->SetName(dropTable->GetName() + SessionId);
}
const auto& modifyScheme = schemeOp.GetDropTable();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}
Expand Down
18 changes: 8 additions & 10 deletions ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ struct NavigateEntryResult {
std::optional<TString> QueryName;
};

NavigateEntryResult CreateNavigateEntry(const TString& cluster, const TString& path,
const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) {
NavigateEntryResult CreateNavigateEntry(const TString& path,
const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) {
TNavigate::TEntry entry;
TString currentPath = path;
std::optional<TString> queryName = std::nullopt;
if (tempTablesState) {
auto tempTablesIt = tempTablesState->TempTables.find(std::make_pair(cluster, currentPath));
if (tempTablesState->SessionId && tempTablesIt != tempTablesState->TempTables.end()) {
auto tempTablesInfoIt = tempTablesState->FindInfo(currentPath, false);
if (tempTablesInfoIt != tempTablesState->TempTables.end()) {
queryName = currentPath;
currentPath = currentPath + *tempTablesState->SessionId;
currentPath = currentPath + tempTablesState->SessionId;
}
}
entry.Path = SplitPath(currentPath);
Expand All @@ -50,10 +50,8 @@ NavigateEntryResult CreateNavigateEntry(const TString& cluster, const TString& p
return {entry, currentPath, queryName};
}

NavigateEntryResult CreateNavigateEntry(const TString& cluster,
const std::pair<TIndexId, TString>& pair,
NavigateEntryResult CreateNavigateEntry(const std::pair<TIndexId, TString>& pair,
const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) {
Y_UNUSED(cluster);
Y_UNUSED(tempTablesState);

TNavigate::TEntry entry;
Expand Down Expand Up @@ -701,8 +699,8 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta

const auto externalEntryItem = CreateNavigateExternalEntry(id, settings.WithExternalDatasources_);
Y_ABORT_UNLESS(!settings.WithExternalDatasources_ || externalEntryItem, "External data source must be resolved using path only");
auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(cluster,
id, settings, TempTablesState);
auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(id,
settings, TempTablesState);
const auto entry = resNavigate.Entry;
const auto queryName = resNavigate.QueryName;
const auto externalEntry = settings.WithExternalDatasources_ ? std::optional<NavigateEntryResult>{} : externalEntryItem;
Expand Down
39 changes: 23 additions & 16 deletions ydb/core/kqp/provider/yql_kikimr_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,13 @@ struct TKikimrData {
const TKikimrTableDescription* TKikimrTablesData::EnsureTableExists(const TString& cluster,
const TString& table, TPositionHandle pos, TExprContext& ctx) const
{
auto tempTable = TempTables.FindPtr(table);

auto tablePath = table;
if (tempTable) {
tablePath = *tempTable;
if (TempTablesState) {
auto tempTableInfoIt = TempTablesState->FindInfo(table, true);

if (tempTableInfoIt != TempTablesState->TempTables.end()) {
tablePath = tempTableInfoIt->first;
}
}

auto desc = Tables.FindPtr(std::make_pair(cluster, tablePath));
Expand All @@ -141,11 +143,13 @@ const TKikimrTableDescription* TKikimrTablesData::EnsureTableExists(const TStrin
}

TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster, const TString& database, const TString& table, ETableType tableType) {
auto tempTable = TempTables.FindPtr(table);

auto tablePath = table;
if (tempTable) {
tablePath = *tempTable;
if (TempTablesState) {
auto tempTableInfoIt = TempTablesState->FindInfo(table, true);

if (tempTableInfoIt != TempTablesState->TempTables.end()) {
tablePath = tempTableInfoIt->first;
}
}

if (!Tables.FindPtr(std::make_pair(cluster, tablePath))) {
Expand All @@ -165,11 +169,13 @@ TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster
}

TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, const TString& table) {
auto tempTable = TempTables.FindPtr(table);

auto tablePath = table;
if (tempTable) {
tablePath = *tempTable;
if (TempTablesState) {
auto tempTableInfoIt = TempTablesState->FindInfo(table, true);

if (tempTableInfoIt != TempTablesState->TempTables.end()) {
tablePath = tempTableInfoIt->first;
}
}

auto desc = Tables.FindPtr(std::make_pair(cluster, tablePath));
Expand All @@ -181,12 +187,13 @@ TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, con
const TKikimrTableDescription& TKikimrTablesData::ExistingTable(const TStringBuf& cluster,
const TStringBuf& table) const
{
auto tempTable = TempTables.FindPtr(table);

auto tablePath = table;
if (TempTablesState) {
auto tempTableInfoIt = TempTablesState->FindInfo(table, true);

if (tempTable) {
tablePath = *tempTable;
if (tempTableInfoIt != TempTablesState->TempTables.end()) {
tablePath = tempTableInfoIt->first;
}
}

auto desc = Tables.FindPtr(std::make_pair(TString(cluster), TString(tablePath)));
Expand Down
35 changes: 13 additions & 22 deletions ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,12 @@ class TKikimrTablesData : public TThrRefBase {
}

void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) {
if (tempTablesState) {
for (const auto& [path, info] : tempTablesState->TempTables) {
TempTables[path.second + *tempTablesState->SessionId] = path.second;
}
}
TempTablesState = std::move(tempTablesState);
}

private:
THashMap<std::pair<TString, TString>, TKikimrTableDescription> Tables;
THashMap<TString, TString> TempTables;
NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState;
};

enum class TYdbOperation : ui32 {
Expand Down Expand Up @@ -288,11 +284,7 @@ class TKikimrTransactionContextBase : public TThrRefBase {
}

void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) {
if (tempTablesState) {
for (const auto& [path, info] : tempTablesState->TempTables) {
TempTables[path.second] = path.second + *tempTablesState->SessionId;
}
}
TempTablesState = std::move(tempTablesState);
}

template<class IterableKqpTableOps, class IterableKqpTableInfos>
Expand Down Expand Up @@ -330,17 +322,16 @@ class TKikimrTransactionContextBase : public TThrRefBase {
}

for (const auto& op : operations) {
const auto& table = [&]() -> const TString& {
const auto tempTable = TempTables.FindPtr(op.GetTable());
if (tempTable) {
return *tempTable;
} else {
return op.GetTable();
}
}();

const auto newOp = TYdbOperation(op.GetOperation());

auto table = op.GetTable();
if (TempTablesState) {
auto tempTableInfoIt = TempTablesState->FindInfo(table, false);
if (tempTableInfoIt != TempTablesState->TempTables.end()) {
table = tempTableInfoIt->first + TempTablesState->SessionId;
}
}

const auto info = tableInfoMap.FindPtr(table);
if (!info) {
TString message = TStringBuilder()
Expand Down Expand Up @@ -450,7 +441,7 @@ class TKikimrTransactionContextBase : public TThrRefBase {
THashMap<TString, TYdbOperations> TableOperations;
THashMap<TKikimrPathId, TString> TableByIdMap;
TMaybe<NKikimrKqp::EIsolationLevel> EffectiveIsolationLevel;
THashMap<TString, TString> TempTables;
NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState;
bool Readonly = false;
bool Invalidated = false;
bool Closed = false;
Expand Down Expand Up @@ -535,7 +526,7 @@ class TKikimrSessionContext : public TThrRefBase {
if (TxCtx) {
TxCtx->SetTempTables(tempTablesState);
}
TempTablesState = tempTablesState;
TempTablesState = std::move(tempTablesState);
}

const TIntrusiveConstPtr<NACLib::TUserToken>& GetUserToken() const {
Expand Down
Loading