Skip to content

Commit d931fd8

Browse files
kitaisrealadameat
authored andcommitted
KQP computation pattern cache serialized program (ydb-platform#634)
KQP computation pattern cache serialized program
1 parent 232ce1e commit d931fd8

File tree

4 files changed

+78
-28
lines changed

4 files changed

+78
-28
lines changed

ydb/core/kqp/compile_service/kqp_compile_computation_pattern_service.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class TKqpCompileComputationPatternService : public TActorBootstrapped<TKqpCompi
8484
return;
8585
}
8686

87-
THashMap<TString, std::shared_ptr<NMiniKQL::TPatternCacheEntry>> patternsToCompile;
87+
THashMap<NMiniKQL::TSerializedProgram, std::shared_ptr<NMiniKQL::TPatternCacheEntry>> patternsToCompile;
8888
patternCache->GetPatternsToCompile(patternsToCompile);
8989

9090
TVector<std::pair<TPatternToCompile, size_t>> patternsToCompileWithAccessTimes;
@@ -115,7 +115,7 @@ class TKqpCompileComputationPatternService : public TActorBootstrapped<TKqpCompi
115115
TIntrusivePtr<TKqpCounters> Counters;
116116

117117
struct TPatternToCompile {
118-
TString SerializedProgram;
118+
NMiniKQL::TSerializedProgram SerializedProgram;
119119
std::shared_ptr<NMiniKQL::TPatternCacheEntry> Entry;
120120
};
121121

ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
3333
return CurrentPatternsCompiledCodeSizeInBytes;
3434
}
3535

36-
std::shared_ptr<TPatternCacheEntry>* Find(const TString& serializedProgram) {
36+
std::shared_ptr<TPatternCacheEntry>* Find(const TSerializedProgram& serializedProgram) {
3737
auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
3838
if (it == SerializedProgramToPatternCacheHolder.end()) {
3939
return nullptr;
@@ -44,7 +44,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
4444
return &it->second.Entry;
4545
}
4646

47-
void Insert(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
47+
void Insert(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
4848
auto [it, inserted] = SerializedProgramToPatternCacheHolder.emplace(std::piecewise_construct,
4949
std::forward_as_tuple(serializedProgram),
5050
std::forward_as_tuple(serializedProgram, entry));
@@ -69,7 +69,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
6969
ClearIfNeeded();
7070
}
7171

72-
void NotifyPatternCompiled(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
72+
void NotifyPatternCompiled(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
7373
auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
7474
if (it == SerializedProgramToPatternCacheHolder.end()) {
7575
return;
@@ -108,7 +108,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
108108
* Most recently accessed items are in back of the lists, least recently accessed items are in front of the lists.
109109
*/
110110
struct TPatternCacheHolder : public TIntrusiveListItem<TPatternCacheHolder, TPatternLRUListTag>, TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag> {
111-
TPatternCacheHolder(TString serializedProgram, std::shared_ptr<TPatternCacheEntry> entry)
111+
TPatternCacheHolder(TSerializedProgram serializedProgram, std::shared_ptr<TPatternCacheEntry> entry)
112112
: SerializedProgram(std::move(serializedProgram))
113113
, Entry(std::move(entry))
114114
{}
@@ -121,7 +121,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
121121
return !TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag>::Empty();
122122
}
123123

124-
TString SerializedProgram;
124+
TSerializedProgram SerializedProgram;
125125
std::shared_ptr<TPatternCacheEntry> Entry;
126126
};
127127

@@ -195,7 +195,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
195195
size_t CurrentCompiledPatternsSize = 0;
196196
size_t CurrentPatternsCompiledCodeSizeInBytes = 0;
197197

198-
THashMap<TString, TPatternCacheHolder> SerializedProgramToPatternCacheHolder;
198+
THashMap<TSerializedProgram, TPatternCacheHolder> SerializedProgramToPatternCacheHolder;
199199
TIntrusiveList<TPatternCacheHolder, TPatternLRUListTag> LRUPatternList;
200200
TIntrusiveList<TPatternCacheHolder, TCompiledPatternLRUListTag> LRUCompiledPatternList;
201201
};
@@ -223,7 +223,7 @@ TComputationPatternLRUCache::~TComputationPatternLRUCache() {
223223
CleanCache();
224224
}
225225

226-
std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serializedProgram) {
226+
std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TSerializedProgram& serializedProgram) {
227227
std::lock_guard<std::mutex> lock(Mutex);
228228
if (auto it = Cache->Find(serializedProgram)) {
229229
++*Hits;
@@ -238,7 +238,7 @@ std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TStr
238238
return {};
239239
}
240240

241-
TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) {
241+
TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TSerializedProgram& serializedProgram) {
242242
std::lock_guard lock(Mutex);
243243
if (auto it = Cache->Find(serializedProgram)) {
244244
++*Hits;
@@ -263,7 +263,7 @@ TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscrib
263263
return TTicket(serializedProgram, false, promise, nullptr);
264264
}
265265

266-
void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
266+
void TComputationPatternLRUCache::EmplacePattern(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
267267
Y_DEBUG_ABORT_UNLESS(patternWithEnv && patternWithEnv->Pattern);
268268
TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
269269

@@ -290,7 +290,7 @@ void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgra
290290
}
291291
}
292292

293-
void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
293+
void TComputationPatternLRUCache::NotifyPatternCompiled(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
294294
std::lock_guard lock(Mutex);
295295
Cache->NotifyPatternCompiled(serializedProgram, patternWithEnv);
296296
}
@@ -309,7 +309,7 @@ void TComputationPatternLRUCache::CleanCache() {
309309
Cache->Clear();
310310
}
311311

312-
void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
312+
void TComputationPatternLRUCache::AccessPattern(const TSerializedProgram & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
313313
if (!Configuration.PatternAccessTimesBeforeTryToCompile || entry->Pattern->IsCompiled()) {
314314
return;
315315
}
@@ -321,11 +321,11 @@ void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgra
321321
}
322322
}
323323

324-
void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) {
324+
void TComputationPatternLRUCache::NotifyMissing(const TSerializedProgram& serializedProgram) {
325325
TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
326326
{
327327
std::lock_guard<std::mutex> lock(Mutex);
328-
auto notifyIt = Notify.find(serialized);
328+
auto notifyIt = Notify.find(serializedProgram);
329329
if (notifyIt != Notify.end()) {
330330
subscribers.swap(notifyIt->second);
331331
Notify.erase(notifyIt);

ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include "mkql_computation_node.h"
4+
#include "mkql_serialized_program.h"
45

56
#include <ydb/library/yql/minikql/mkql_node.h>
67
#include <library/cpp/threading/future/future.h>
@@ -57,16 +58,16 @@ class TComputationPatternLRUCache {
5758
public:
5859
class TTicket : private TNonCopyable {
5960
public:
60-
TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
61-
: Serialized(serialized)
61+
TTicket(const TSerializedProgram& serializedProgram, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
62+
: SerializedProgram(serializedProgram)
6263
, IsOwned(isOwned)
6364
, Future(future)
6465
, Cache(cache)
6566
{}
6667

6768
~TTicket() {
6869
if (Cache) {
69-
Cache->NotifyMissing(Serialized);
70+
Cache->NotifyMissing(SerializedProgram);
7071
}
7172
}
7273

@@ -84,7 +85,7 @@ class TComputationPatternLRUCache {
8485
}
8586

8687
private:
87-
const TString Serialized;
88+
const TSerializedProgram SerializedProgram;
8889
const bool IsOwned;
8990
const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>> Future;
9091
TComputationPatternLRUCache* Cache;
@@ -124,13 +125,13 @@ class TComputationPatternLRUCache {
124125
return std::make_shared<TPatternCacheEntry>(useAlloc);
125126
}
126127

127-
std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram);
128+
std::shared_ptr<TPatternCacheEntry> Find(const TSerializedProgram& serializedProgram);
128129

129-
TTicket FindOrSubscribe(const TString& serializedProgram);
130+
TTicket FindOrSubscribe(const TSerializedProgram& serializedProgram);
130131

131-
void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
132+
void EmplacePattern(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
132133

133-
void NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
134+
void NotifyPatternCompiled(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
134135

135136
size_t GetSize() const;
136137

@@ -159,27 +160,27 @@ class TComputationPatternLRUCache {
159160
return PatternsToCompile.size();
160161
}
161162

162-
void GetPatternsToCompile(THashMap<TString, std::shared_ptr<TPatternCacheEntry>> & result) {
163+
void GetPatternsToCompile(THashMap<TSerializedProgram, std::shared_ptr<TPatternCacheEntry>> & result) {
163164
std::lock_guard lock(Mutex);
164165
result.swap(PatternsToCompile);
165166
}
166167

167168
private:
168-
void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
169+
void AccessPattern(const TSerializedProgram & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
169170

170-
void NotifyMissing(const TString& serialized);
171+
void NotifyMissing(const TSerializedProgram& serializedProgram);
171172

172173
static constexpr size_t CacheMaxElementsSize = 10000;
173174

174175
friend class TTicket;
175176

176177
mutable std::mutex Mutex;
177-
THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
178+
THashMap<TSerializedProgram, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
178179

179180
class TLRUPatternCacheImpl;
180181
std::unique_ptr<TLRUPatternCacheImpl> Cache;
181182

182-
THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
183+
THashMap<TSerializedProgram, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
183184

184185
const Config Configuration;
185186

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#pragma once
2+
3+
#include <util/generic/string.h>
4+
#include <util/generic/hash.h>
5+
6+
namespace NKikimr::NMiniKQL {
7+
8+
/// Serialized program with precomputed hash
9+
class TSerializedProgram
10+
{
11+
public:
12+
TSerializedProgram(TString data)
13+
: Data(std::move(data))
14+
, Hash(THash<TString>()(data))
15+
{}
16+
17+
const TString & GetData() const
18+
{
19+
return Data;
20+
}
21+
22+
uint64_t GetHash() const
23+
{
24+
return Hash;
25+
}
26+
27+
friend bool operator==(const TSerializedProgram & lhs, const TSerializedProgram & rhs)
28+
{
29+
return lhs.Hash == rhs.Hash && lhs.Data == rhs.Data;
30+
}
31+
32+
friend bool operator!=(const TSerializedProgram & lhs, const TSerializedProgram & rhs)
33+
{
34+
return !(lhs == rhs);
35+
}
36+
37+
private:
38+
TString Data;
39+
ui64 Hash;
40+
};
41+
42+
}
43+
44+
template<>
45+
struct THash<NKikimr::NMiniKQL::TSerializedProgram> {
46+
inline ui64 operator()(const NKikimr::NMiniKQL::TSerializedProgram& serializedProgram) const noexcept {
47+
return serializedProgram.GetHash();
48+
}
49+
};

0 commit comments

Comments
 (0)