Skip to content

Commit 2209648

Browse files
committed
Fixes
1 parent df7bfaf commit 2209648

File tree

2 files changed

+80
-62
lines changed

2 files changed

+80
-62
lines changed

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 77 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,11 @@ class TKqpRequestsQueue {
388388
};
389389

390390
class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
391+
enum ECacheType {
392+
ByUid,
393+
ByQuery,
394+
ByAst,
395+
};
391396
public:
392397
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
393398
return NKikimrServices::TActivity::KQP_COMPILE_SERVICE;
@@ -566,26 +571,25 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
566571
if (request.Uid) {
567572
Counters->ReportCompileRequestGet(dbCounters);
568573

569-
if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) {
570-
auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache);
571-
if (compileResult) {
572-
Y_ENSURE(compileResult->Query);
573-
if (compileResult->Query->UserSid == userSid) {
574-
Counters->ReportQueryCacheHit(dbCounters, true);
575-
576-
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid"
577-
<< ", sender: " << ev->Sender
578-
<< ", queryUid: " << *request.Uid);
579-
580-
ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan));
581-
return;
582-
} else {
583-
LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query"
584-
<< ", sender: " << ev->Sender
585-
<< ", queryUid: " << *request.Uid
586-
<< ", expected sid: " << compileResult->Query->UserSid
587-
<< ", actual sid: " << userSid);
588-
}
574+
auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache);
575+
compileResult = WithCache(std::move(compileResult), request.TempTablesState);
576+
if (compileResult) {
577+
Y_ENSURE(compileResult->Query);
578+
if (compileResult->Query->UserSid == userSid) {
579+
Counters->ReportQueryCacheHit(dbCounters, true);
580+
581+
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid"
582+
<< ", sender: " << ev->Sender
583+
<< ", queryUid: " << *request.Uid);
584+
585+
ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan));
586+
return;
587+
} else {
588+
LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query"
589+
<< ", sender: " << ev->Sender
590+
<< ", queryUid: " << *request.Uid
591+
<< ", expected sid: " << compileResult->Query->UserSid
592+
<< ", actual sid: " << userSid);
589593
}
590594
}
591595

@@ -611,19 +615,18 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
611615
Y_ENSURE(query.UserSid == userSid);
612616
}
613617

618+
auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache);
619+
compileResult = WithCache(std::move(compileResult), request.TempTablesState);
614620

615-
if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) {
616-
auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache);
617-
if (compileResult) {
618-
Counters->ReportQueryCacheHit(dbCounters, true);
621+
if (compileResult) {
622+
Counters->ReportQueryCacheHit(dbCounters, true);
619623

620-
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text"
621-
<< ", sender: " << ev->Sender
622-
<< ", queryUid: " << compileResult->Uid);
624+
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text"
625+
<< ", sender: " << ev->Sender
626+
<< ", queryUid: " << compileResult->Uid);
623627

624-
ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan));
625-
return;
626-
}
628+
ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan));
629+
return;
627630
}
628631

629632
CollectDiagnostics = request.CollectDiagnostics;
@@ -677,10 +680,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
677680
auto dbCounters = request.DbCounters;
678681
Counters->ReportRecompileRequestGet(dbCounters);
679682

680-
TKqpCompileResult::TConstPtr compileResult = nullptr;
681-
if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) {
682-
compileResult = QueryCache.FindByUid(request.Uid, false);
683-
}
683+
TKqpCompileResult::TConstPtr compileResult = QueryCache.FindByUid(request.Uid, false);
684+
compileResult = WithCache(std::move(compileResult), request.TempTablesState);
684685

685686
if (compileResult || request.Query) {
686687
Counters->ReportCompileRequestCompile(dbCounters);
@@ -745,27 +746,12 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
745746

746747
bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache;
747748

748-
bool hasTempTables = compileRequest.TempTablesState
749-
&& (!compileRequest.TempTablesState->TempTables.empty());
750-
if (compileResult->PreparedQuery) {
751-
hasTempTables = compileResult->PreparedQuery->HasTempTables(compileRequest.TempTablesState);
752-
}
749+
bool hasTempTables = WithCache(compileResult, compileRequest.TempTablesState) != nullptr;
753750

754751
try {
755752
if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
756753
if (!hasTempTables) {
757-
if (QueryCache.FindByUid(compileResult->Uid, false)) {
758-
QueryCache.Replace(compileResult);
759-
} else if (keepInCache) {
760-
if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) {
761-
Counters->CompileQueryCacheEvicted->Inc();
762-
}
763-
if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) {
764-
if (InsertPreparingQuery(compileResult, compileRequest.KeepInCache)) {
765-
Counters->CompileQueryCacheEvicted->Inc();
766-
};
767-
}
768-
}
754+
UpdateQueryCache(compileResult, keepInCache);
769755
}
770756

771757
if (ev->Get()->ReplayMessage) {
@@ -833,25 +819,54 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
833819
StartCheckQueriesTtlTimer();
834820
}
835821

822+
TKqpCompileResult::TConstPtr WithCache(
823+
TKqpCompileResult::TConstPtr cacheResult, TKqpTempTablesState::TConstPtr tempTablesState) {
824+
if (!cacheResult) {
825+
return nullptr;
826+
}
827+
if (!cacheResult->PreparedQuery) {
828+
return cacheResult;
829+
}
830+
auto hasTempTables = cacheResult->PreparedQuery->HasTempTables(tempTablesState);
831+
if (hasTempTables) {
832+
return nullptr;
833+
}
834+
return cacheResult;
835+
}
836+
837+
void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache) {
838+
if (QueryCache.FindByUid(compileResult->Uid, false)) {
839+
QueryCache.Replace(compileResult);
840+
} else if (keepInCache) {
841+
if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) {
842+
Counters->CompileQueryCacheEvicted->Inc();
843+
}
844+
if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) {
845+
if (InsertPreparingQuery(compileResult, true)) {
846+
Counters->CompileQueryCacheEvicted->Inc();
847+
};
848+
}
849+
}
850+
}
851+
836852
void Handle(TEvKqp::TEvParseResponse::TPtr& ev, const TActorContext& ctx) {
837853
auto& parseResult = ev->Get()->AstResult;
838854
auto& query = ev->Get()->Query;
839855
auto compileRequest = RequestsQueue.FinishActiveRequest(query);
840856
if (parseResult && parseResult->Ast->IsOk()) {
841-
if (!compileRequest.TempTablesState || compileRequest.TempTablesState->TempTables.empty()) {
842-
auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache);
843-
if (compileResult) {
844-
Counters->ReportQueryCacheHit(compileRequest.DbCounters, true);
857+
auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache);
858+
compileResult = WithCache(std::move(compileResult), compileRequest.TempTablesState);
859+
if (compileResult) {
860+
Counters->ReportQueryCacheHit(compileRequest.DbCounters, true);
845861

846-
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast"
847-
<< ", sender: " << compileRequest.Sender
848-
<< ", queryUid: " << compileResult->Uid);
862+
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast"
863+
<< ", sender: " << compileRequest.Sender
864+
<< ", queryUid: " << compileResult->Uid);
849865

850-
compileResult->Ast->PgAutoParamValues = std::move(parseResult->Ast->PgAutoParamValues);
866+
compileResult->Ast->PgAutoParamValues = std::move(parseResult->Ast->PgAutoParamValues);
851867

852-
ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
853-
return;
854-
}
868+
ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
869+
return;
855870
}
856871
}
857872
Counters->ReportQueryCacheHit(compileRequest.DbCounters, false);

ydb/core/kqp/query_data/kqp_prepared_query.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,9 @@ void TPreparedQueryHolder::FillTables(const google::protobuf::RepeatedPtrField<
271271
}
272272

273273
bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState) const {
274+
if (!tempTablesState) {
275+
return false;
276+
}
274277
auto tempTables = THashSet<TString>();
275278
for (const auto& [path, info] : tempTablesState->TempTables) {
276279
tempTables.insert(path.second + *tempTablesState->SessionId);

0 commit comments

Comments
 (0)