Skip to content

Commit 541cba7

Browse files
authored
YQL: Add metrics for YT binary cache (#11953)
1 parent 571cfe1 commit 541cba7

File tree

8 files changed

+58
-8
lines changed

8 files changed

+58
-8
lines changed

ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp

+33-1
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,29 @@ std::pair<TString, NYT::TTransactionId> TTransactionCache::TEntry::GetBinarySnap
327327
return std::make_pair(snapshotPath, snapshotTx->GetId());
328328
}
329329

330+
void TTransactionCache::TEntry::UpdateCacheMetrics(const TString& fileName, ECacheStatus status) {
331+
static const TString cacheHitMrjob = "CacheHitMrjob";
332+
static const TString cacheMissMrjob = "CacheMissMrjob";
333+
static const TString cacheOtherMrjob = "CacheOtherMrjob";
334+
static const TString cacheHitUdf = "CacheHitUdf";
335+
static const TString cacheMissUdf = "CacheMissUdf";
336+
static const TString cacheOtherUdf = "CacheOtherUdf";
337+
338+
if (Metrics) {
339+
bool isMrJob = fileName == "mrjob";
340+
switch(status) {
341+
case ECacheStatus::Hit:
342+
isMrJob ? Metrics->IncCounter(cacheHitMrjob, Server) : Metrics->IncCounter(cacheHitUdf, Server);
343+
break;
344+
case ECacheStatus::Miss:
345+
isMrJob ? Metrics->IncCounter(cacheMissMrjob, Server) : Metrics->IncCounter(cacheMissUdf, Server);
346+
break;
347+
default:
348+
isMrJob ? Metrics->IncCounter(cacheOtherMrjob, Server) : Metrics->IncCounter(cacheOtherUdf, Server);
349+
}
350+
}
351+
};
352+
330353
TMaybe<std::pair<TString, NYT::TTransactionId>> TTransactionCache::TEntry::GetBinarySnapshotFromCache(TString binaryCacheFolder, const TString& md5, const TString& fileName) {
331354
if (binaryCacheFolder.StartsWith(NYT::TConfig::Get()->Prefix)) {
332355
binaryCacheFolder = binaryCacheFolder.substr(NYT::TConfig::Get()->Prefix.size());
@@ -341,6 +364,7 @@ TMaybe<std::pair<TString, NYT::TTransactionId>> TTransactionCache::TEntry::GetBi
341364
}
342365
snapshotTx = BinarySnapshotTx;
343366
if (auto p = BinarySnapshots.FindPtr(remotePath)) {
367+
UpdateCacheMetrics(fileName, ECacheStatus::Hit);
344368
return std::make_pair(*p, snapshotTx->GetId());
345369
}
346370
}
@@ -350,6 +374,11 @@ TMaybe<std::pair<TString, NYT::TTransactionId>> TTransactionCache::TEntry::GetBi
350374
snapshotPath = TStringBuilder() << '#' << GetGuidAsString(fileLock->GetLockedNodeId());
351375
} catch (const TErrorResponse& e) {
352376
YQL_CLOG(WARN, ProviderYt) << "Can't load binary for \"" << fileName << "\" from BinaryCacheFolder: " << e.what();
377+
if (e.IsResolveError()) {
378+
UpdateCacheMetrics(fileName, ECacheStatus::Miss);
379+
} else {
380+
UpdateCacheMetrics(fileName, ECacheStatus::Other);
381+
}
353382
return Nothing();
354383
}
355384
with_lock(Lock_) {
@@ -358,6 +387,8 @@ TMaybe<std::pair<TString, NYT::TTransactionId>> TTransactionCache::TEntry::GetBi
358387
YQL_CLOG(DEBUG, ProviderYt) << "Snapshot \""
359388
<< fileName << "\" -> \"" << remotePath << "\" -> "
360389
<< snapshotPath << ", tx=" << GetGuidAsString(snapshotTx->GetId());
390+
UpdateCacheMetrics(fileName, ECacheStatus::Hit);
391+
361392
return std::make_pair(snapshotPath, snapshotTx->GetId());
362393
}
363394

@@ -389,7 +420,7 @@ TTransactionCache::TEntry::TPtr TTransactionCache::TryGetEntry(const TString& se
389420
}
390421

391422
TTransactionCache::TEntry::TPtr TTransactionCache::GetOrCreateEntry(const TString& server, const TString& token,
392-
const TMaybe<TString>& impersonationUser, const TSpecProvider& specProvider, const TYtSettings::TConstPtr& config)
423+
const TMaybe<TString>& impersonationUser, const TSpecProvider& specProvider, const TYtSettings::TConstPtr& config, IMetricsRegistryPtr metrics)
393424
{
394425
TEntry::TPtr createdEntry = nullptr;
395426
NYT::TTransactionId externalTx = config->ExternalTx.Get().GetOrElse(TGUID());
@@ -430,6 +461,7 @@ TTransactionCache::TEntry::TPtr TTransactionCache::GetOrCreateEntry(const TStrin
430461
}
431462
createdEntry->InflightTempTablesLimit = config->InflightTempTablesLimit.Get().GetOrElse(Max<ui32>());
432463
createdEntry->KeepTables = GetReleaseTempDataMode(*config) == EReleaseTempDataMode::Never;
464+
createdEntry->Metrics = metrics;
433465

434466
TxMap_.emplace(server, createdEntry);
435467
}

ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h

+10-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/library/yql/providers/yt/common/yql_yt_settings.h>
44

55
#include <yql/essentials/core/file_storage/storage.h>
6+
#include <yql/essentials/providers/common/metrics/metrics_registry.h>
67

78
#include <yt/cpp/mapreduce/interface/client.h>
89
#include <yt/cpp/mapreduce/interface/fwd.h>
@@ -60,6 +61,7 @@ class TTransactionCache {
6061
THashMap<TString, TFileLinkPtr> FolderFilePtrCache;
6162

6263
TMutex Lock_;
64+
IMetricsRegistryPtr Metrics;
6365

6466
inline void DeleteAtFinalize(const TString& table) {
6567
with_lock(Lock_) {
@@ -121,6 +123,13 @@ class TTransactionCache {
121123
std::pair<TString, NYT::TTransactionId> GetBinarySnapshot(TString remoteTmpFolder, const TString& md5, const TString& localPath, TDuration expirationInterval);
122124
TMaybe<std::pair<TString, NYT::TTransactionId>> GetBinarySnapshotFromCache(TString binaryCacheFolder, const TString& md5, const TString& fileName);
123125

126+
enum class ECacheStatus {
127+
Hit,
128+
Miss,
129+
Other
130+
};
131+
void UpdateCacheMetrics(const TString& fileName, ECacheStatus status);
132+
124133
void CreateDefaultTmpFolder();
125134

126135
using TPtr = TIntrusivePtr<TEntry>;
@@ -143,7 +152,7 @@ class TTransactionCache {
143152
TTransactionCache(const TString& userName);
144153

145154
TEntry::TPtr GetEntry(const TString& server);
146-
TEntry::TPtr GetOrCreateEntry(const TString& server, const TString& token, const TMaybe<TString>& impersonationUser, const TSpecProvider& specProvider, const TYtSettings::TConstPtr& config);
155+
TEntry::TPtr GetOrCreateEntry(const TString& server, const TString& token, const TMaybe<TString>& impersonationUser, const TSpecProvider& specProvider, const TYtSettings::TConstPtr& config, IMetricsRegistryPtr metrics);
147156
TEntry::TPtr TryGetEntry(const TString& server);
148157

149158
void Commit(const TString& server);

ydb/library/yql/providers/yt/gateway/lib/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ PEERDIR(
2929
yql/essentials/core/type_ann
3030
yql/essentials/providers/common/codec
3131
yql/essentials/providers/common/gateway
32+
yql/essentials/providers/common/metrics
3233
ydb/library/yql/providers/yt/provider
3334
ydb/library/yql/providers/yt/common
3435
ydb/library/yql/providers/yt/lib/hash

ydb/library/yql/providers/yt/gateway/native/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ PEERDIR(
3434
yql/essentials/core/issue
3535
yql/essentials/providers/common/codec
3636
yql/essentials/providers/common/comp_nodes
37+
yql/essentials/providers/common/metrics
3738
yql/essentials/providers/common/mkql
3839
yql/essentials/providers/common/proto
3940
yql/essentials/providers/common/provider

ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ TExecContextBase::TExecContextBase(const TYtNativeServices& services,
4040
const TIntrusivePtr<NCommon::TMkqlCommonCallableCompiler>& mkqlCompiler,
4141
const TSession::TPtr& session,
4242
const TString& cluster,
43-
const TYtUrlMapper& urlMapper)
43+
const TYtUrlMapper& urlMapper,
44+
IMetricsRegistryPtr metrics)
4445
: FunctionRegistry_(services.FunctionRegistry)
4546
, FileStorage_(services.FileStorage)
4647
, Config_(services.Config)
@@ -51,6 +52,7 @@ TExecContextBase::TExecContextBase(const TYtNativeServices& services,
5152
, UrlMapper_(urlMapper)
5253
, DisableAnonymousClusterAccess_(services.DisableAnonymousClusterAccess)
5354
, Hidden(session->SessionId_.EndsWith("_hidden"))
55+
, Metrics(std::move(metrics))
5456
{
5557
YtServer_ = Clusters_->GetServer(Cluster_);
5658
LogCtx_ = NYql::NLog::CurrentLogContextPath();
@@ -368,7 +370,7 @@ TTransactionCache::TEntry::TPtr TExecContextBase::GetOrCreateEntry(const TYtSett
368370
throw yexception() << "Accessing YT cluster " << Cluster_ << " without OAuth token is not allowed";
369371
}
370372

371-
return Session_->TxCache_.GetOrCreateEntry(YtServer_, token, impersonationUser, [s = Session_]() { return s->CreateSpecWithDesc(); }, settings);
373+
return Session_->TxCache_.GetOrCreateEntry(YtServer_, token, impersonationUser, [s = Session_]() { return s->CreateSpecWithDesc(); }, settings, Metrics);
372374
}
373375

374376
TExpressionResorceUsage TExecContextBase::ScanExtraResourceUsageImpl(const TExprNode& node, const TYtSettings::TConstPtr& config, bool withInput) {

ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.h

+6-3
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ class TExecContextBase: public TThrRefBase {
9898
const TIntrusivePtr<NCommon::TMkqlCommonCallableCompiler>& mkqlCompiler,
9999
const TSession::TPtr& session,
100100
const TString& cluster,
101-
const TYtUrlMapper& urlMapper);
101+
const TYtUrlMapper& urlMapper,
102+
IMetricsRegistryPtr metrics);
102103

103104
public:
104105
TString GetInputSpec(bool ensureOldTypesOnly, ui64 nativeTypeCompatibilityFlags, bool intermediateInput) const;
@@ -165,6 +166,7 @@ class TExecContextBase: public TThrRefBase {
165166
const TYtUrlMapper& UrlMapper_;
166167
bool DisableAnonymousClusterAccess_;
167168
bool Hidden = false;
169+
IMetricsRegistryPtr Metrics;
168170
};
169171

170172

@@ -180,8 +182,9 @@ class TExecContext: public TExecContextBase {
180182
TOptions&& options,
181183
const TSession::TPtr& session,
182184
const TString& cluster,
183-
const TYtUrlMapper& urlMapper)
184-
: TExecContextBase(services, clusters, mkqlCompiler, session, cluster, urlMapper)
185+
const TYtUrlMapper& urlMapper,
186+
IMetricsRegistryPtr metrics)
187+
: TExecContextBase(services, clusters, mkqlCompiler, session, cluster, urlMapper, std::move(metrics))
185188
, Options_(std::move(options))
186189
{
187190
}

ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -5407,7 +5407,7 @@ class TYtNativeGateway : public IYtGateway {
54075407
const TExprNode* root,
54085408
TExprContext* exprCtx) const
54095409
{
5410-
auto ctx = MakeIntrusive<TExecContext<TOptions>>(Services_, Clusters_, MkqlCompiler_, std::move(options), session, cluster, UrlMapper_);
5410+
auto ctx = MakeIntrusive<TExecContext<TOptions>>(Services_, Clusters_, MkqlCompiler_, std::move(options), session, cluster, UrlMapper_, Services_.Metrics);
54115411
if (root) {
54125412
YQL_ENSURE(exprCtx);
54135413
if (TYtTransientOpBase::Match(root)) {

ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <yql/essentials/core/file_storage/file_storage.h>
66
#include <yql/essentials/minikql/mkql_function_registry.h>
7+
#include <yql/essentials/providers/common/metrics/metrics_registry.h>
78

89
#include <util/generic/ptr.h>
910

@@ -19,6 +20,7 @@ struct TYtNativeServices {
1920
TYtGatewayConfigPtr Config;
2021
// allow anonymous access for tests
2122
bool DisableAnonymousClusterAccess = false;
23+
IMetricsRegistryPtr Metrics;
2224
};
2325

2426
IYtGateway::TPtr CreateYtNativeGateway(const TYtNativeServices& services);

0 commit comments

Comments
 (0)