Skip to content

Commit 11059fa

Browse files
abyss7zinal
authored andcommitted
Mitigate double notification of compiled pattern scenario (ydb-platform#10499) (ydb-platform#10530)
1 parent ee6c963 commit 11059fa

File tree

4 files changed

+48
-7
lines changed

4 files changed

+48
-7
lines changed

ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class TKqpCompileComputationPatternService : public TActorBootstrapped<TKqpCompi
6262
timer.Reset();
6363

6464
patternToCompile.Entry->Pattern->Compile({}, nullptr);
65-
patternCache->NotifyPatternCompiled(patternToCompile.SerializedProgram, patternToCompile.Entry);
65+
patternCache->NotifyPatternCompiled(patternToCompile.SerializedProgram);
6666
patternToCompile.Entry = nullptr;
6767

6868
Counters->CompiledComputationPatterns->Inc();

ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,20 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
6969
ClearIfNeeded();
7070
}
7171

72-
void NotifyPatternCompiled(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
72+
void NotifyPatternCompiled(const TString & serializedProgram) {
7373
auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
7474
if (it == SerializedProgramToPatternCacheHolder.end()) {
7575
return;
7676
}
7777

78-
Y_ASSERT(entry->Pattern->IsCompiled());
78+
const auto& entry = it->second.Entry;
79+
80+
Y_ENSURE(entry->Pattern->IsCompiled());
81+
82+
if (it->second.LinkedInCompiledPatternLRUList()) {
83+
return;
84+
}
7985

80-
Y_ASSERT(!it->second.LinkedInCompiledPatternLRUList());
8186
PromoteEntry(&it->second);
8287

8388
++CurrentCompiledPatternsSize;
@@ -290,9 +295,9 @@ void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgra
290295
}
291296
}
292297

293-
void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
298+
void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serializedProgram) {
294299
std::lock_guard lock(Mutex);
295-
Cache->NotifyPatternCompiled(serializedProgram, patternWithEnv);
300+
Cache->NotifyPatternCompiled(serializedProgram);
296301
}
297302

298303
size_t TComputationPatternLRUCache::GetSize() const {

ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class TComputationPatternLRUCache {
130130

131131
void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
132132

133-
void NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
133+
void NotifyPatternCompiled(const TString& serializedProgram);
134134

135135
size_t GetSize() const;
136136

ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,42 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
630630
}
631631
}
632632

633+
Y_UNIT_TEST(DoubleNotifyPatternCompiled) {
634+
class TMockComputationPattern final : public IComputationPattern {
635+
public:
636+
explicit TMockComputationPattern(size_t codeSize) : Size_(codeSize) {}
637+
638+
void Compile(TString, IStatsRegistry*) override { Compiled_ = true; }
639+
bool IsCompiled() const override { return Compiled_; }
640+
size_t CompiledCodeSize() const override { return Size_; }
641+
void RemoveCompiledCode() override { Compiled_ = false; }
642+
THolder<IComputationGraph> Clone(const TComputationOptsFull&) override { return {}; }
643+
bool GetSuitableForCache() const override { return true; }
644+
645+
private:
646+
const size_t Size_;
647+
bool Compiled_ = false;
648+
};
649+
650+
const TString key = "program";
651+
const ui32 cacheSize = 2;
652+
TComputationPatternLRUCache cache({cacheSize, cacheSize});
653+
654+
auto entry = std::make_shared<TPatternCacheEntry>();
655+
entry->Pattern = MakeIntrusive<TMockComputationPattern>(1u);
656+
cache.EmplacePattern(key, entry);
657+
658+
for (ui32 i = 0; i < cacheSize + 1; ++i) {
659+
entry->Pattern->Compile("", nullptr);
660+
cache.NotifyPatternCompiled(key);
661+
}
662+
663+
entry = std::make_shared<TPatternCacheEntry>();
664+
entry->Pattern = MakeIntrusive<TMockComputationPattern>(cacheSize + 1);
665+
entry->Pattern->Compile("", nullptr);
666+
cache.EmplacePattern(key, entry);
667+
}
668+
633669
Y_UNIT_TEST(AddPerf) {
634670
TTimer t("all: ");
635671
TScopedAlloc alloc(__LOCATION__);

0 commit comments

Comments
 (0)