Skip to content

graph backend KIKIMR-18277 #745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ __pycache__/
*_pb2.py
*_pb2_grpc.py
*_pb2.pyi
*.pb.h
*.pb.cc

# MacOS specific
.DS_Store
Expand Down
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ struct TKikimrEvents : TEvents {
ES_DB_METADATA_CACHE,
ES_TABLE_CREATOR,
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
};
};

Expand Down
7 changes: 7 additions & 0 deletions ydb/core/base/pool_stats_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/helpers/pool_stats_collector.h>

#include <ydb/core/graph/api/service.h>
#include <ydb/core/graph/api/events.h>

namespace NKikimr {

// Periodically collects stats from executor threads and exposes them as mon counters
Expand Down Expand Up @@ -44,11 +47,15 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
MiniKQLPoolStats.Update();

TVector<std::tuple<TString, double, ui32>> pools;
double cpuUsage = 0;
for (const auto& pool : PoolCounters) {
pools.emplace_back(pool.Name, pool.Usage, pool.Threads);
cpuUsage += pool.Usage;
}

ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools));

ctx.Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvSendMetrics("cpu_usage", cpuUsage));
}

private:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/base/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ PEERDIR(
ydb/core/base/services
ydb/core/debug
ydb/core/erasure
ydb/core/graph/api
ydb/core/protos
ydb/core/protos/out
ydb/core/scheme
Expand Down
1 change: 1 addition & 0 deletions ydb/core/cms/console/console_tenants_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ class TSubDomainManip : public TActorBootstrapped<TSubDomainManip> {
subdomain.SetName(Subdomain.second);
if (Tenant->IsExternalSubdomain) {
subdomain.SetExternalSchemeShard(true);
subdomain.SetGraphShard(true);
if (Tenant->IsExternalHive) {
subdomain.SetExternalHive(true);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ union TBasicKikimrServicesMask {
// next 64 flags

bool EnableDatabaseMetadataCache:1;
bool EnableGraphService:1;
};

struct {
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@
#include <ydb/library/actors/util/affinity.h>
#include <ydb/library/actors/wilson/wilson_uploader.h>

#include <ydb/core/graph/api/service.h>
#include <ydb/core/graph/api/shard.h>

#include <library/cpp/logger/global/global.h>
#include <library/cpp/logger/log.h>

Expand Down Expand Up @@ -1022,7 +1025,7 @@ void TLocalServiceInitializer::InitializeServices(
addToLocalConfig(TTabletTypes::ReplicationController, &NReplication::CreateController, TMailboxType::ReadAsFilled, appData->UserPoolId);
addToLocalConfig(TTabletTypes::BlobDepot, &NBlobDepot::CreateBlobDepot, TMailboxType::ReadAsFilled, appData->UserPoolId);
addToLocalConfig(TTabletTypes::StatisticsAggregator, &NStat::CreateStatisticsAggregator, TMailboxType::ReadAsFilled, appData->UserPoolId);

addToLocalConfig(TTabletTypes::GraphShard, &NGraph::CreateGraphShard, TMailboxType::ReadAsFilled, appData->UserPoolId);

TTenantPoolConfig::TPtr tenantPoolConfig = new TTenantPoolConfig(Config.GetTenantPoolConfig(), localConfig);
if (!tenantPoolConfig->IsEnabled && !tenantPoolConfig->StaticSlots.empty())
Expand Down Expand Up @@ -2688,5 +2691,16 @@ void TDatabaseMetadataCacheInitializer::InitializeServices(NActors::TActorSystem
TActorSetupCmd(CreateDatabaseMetadataCache(appData->TenantName), TMailboxType::HTSwap, appData->UserPoolId));
}

TGraphServiceInitializer::TGraphServiceInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig)
{
}

void TGraphServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
setup->LocalServices.emplace_back(
NGraph::MakeGraphServiceId(),
TActorSetupCmd(NGraph::CreateGraphService(appData->TenantName), TMailboxType::HTSwap, appData->UserPoolId));
}

} // namespace NKikimrServicesInitializers
} // namespace NKikimr
8 changes: 8 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -612,5 +612,13 @@ class TDatabaseMetadataCacheInitializer : public IKikimrServicesInitializer {

void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TGraphServiceInitializer : public IKikimrServicesInitializer {
public:
TGraphServiceInitializer(const TKikimrRunConfig& runConfig);

void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

} // namespace NKikimrServicesInitializers
} // namespace NKikimr
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TDatabaseMetadataCacheInitializer(runConfig));
}

if (serviceMask.EnableGraphService) {
sil->AddServiceInitializer(new TGraphServiceInitializer(runConfig));
}

return sil;
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ PEERDIR(
ydb/core/formats
ydb/core/fq/libs/init
ydb/core/fq/libs/logs
ydb/core/graph/service
ydb/core/graph/shard
ydb/core/grpc_services
ydb/core/grpc_services/base
ydb/core/grpc_services/auth_processor
Expand Down
48 changes: 48 additions & 0 deletions ydb/core/graph/api/events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <ydb/core/base/events.h>
#include <ydb/core/graph/protos/graph.pb.h>

namespace NKikimr {
namespace NGraph {

struct TEvGraph {
enum EEv {
// requests
EvSendMetrics = EventSpaceBegin(TKikimrEvents::ES_GRAPH),
EvGetMetrics,
EvMetricsResult,
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_GRAPH), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_GRAPH)");

struct TEvSendMetrics : TEventPB<TEvSendMetrics, NKikimrGraph::TEvSendMetrics, EvSendMetrics> {
TEvSendMetrics() = default;

TEvSendMetrics(const TString& name, double value) {
NKikimrGraph::TMetric* metric = Record.AddMetrics();
metric->SetName(name);
metric->SetValue(value);
}
};

struct TEvGetMetrics : TEventPB<TEvGetMetrics, NKikimrGraph::TEvGetMetrics, EvGetMetrics> {
TEvGetMetrics() = default;

TEvGetMetrics(const NKikimrGraph::TEvGetMetrics& request)
: TEventPB<TEvGetMetrics, NKikimrGraph::TEvGetMetrics, EvGetMetrics>(request)
{}
};

struct TEvMetricsResult : TEventPB<TEvMetricsResult, NKikimrGraph::TEvMetricsResult, EvMetricsResult> {
TEvMetricsResult() = default;

TEvMetricsResult(NKikimrGraph::TEvMetricsResult&& result)
: TEventPB<TEvMetricsResult, NKikimrGraph::TEvMetricsResult, EvMetricsResult>(std::move(result))
{}
};
};

} // NGraph
} // NKikimr
18 changes: 18 additions & 0 deletions ydb/core/graph/api/service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <ydb/library/actors/core/actor.h>

namespace NKikimr {
namespace NGraph {

using namespace NActors;

inline TActorId MakeGraphServiceId(ui32 node = 0) {
char x[12] = {'g','r','a','p','h','s', 'v', 'c'};
return TActorId(node, TStringBuf(x, 12));
}

IActor* CreateGraphService(const TString& database);

} // NGraph
} // NKikimr
14 changes: 14 additions & 0 deletions ydb/core/graph/api/shard.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <ydb/library/actors/core/actor.h>
#include <ydb/core/base/blobstorage.h>

namespace NKikimr {
namespace NGraph {

using namespace NActors;

IActor* CreateGraphShard(const TActorId& tablet, TTabletStorageInfo* info);

} // NGraph
} // NKikimr
18 changes: 18 additions & 0 deletions ydb/core/graph/api/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
LIBRARY()

OWNER(
xenoxeno
g:kikimr
)

SRCS(
events.h
service.h
shard.h
)

PEERDIR(
ydb/core/graph/protos
)

END()
31 changes: 31 additions & 0 deletions ydb/core/graph/protos/graph.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
syntax = "proto3";

package NKikimrGraph;

option java_package = "ru.yandex.kikimr.proto";

message TMetric {
string Name = 1;
double Value = 2;
}

message TEvSendMetrics {
repeated TMetric Metrics = 1;
}

message TEvGetMetrics {
optional uint64 TimeFrom = 1;
optional uint64 TimeTo = 2;
repeated string Metrics = 3;
optional uint32 MaxPoints = 4;
}

message TMetricData {
repeated double Values = 1 [packed = true];
}

message TEvMetricsResult {
repeated uint64 Time = 1 [packed = true];
repeated TMetricData Data = 2;
string Error = 3;
}
14 changes: 14 additions & 0 deletions ydb/core/graph/protos/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
PROTO_LIBRARY()

OWNER(
xenoxeno
g:kikimr
)

SRCS(
graph.proto
)

EXCLUDE_TAGS(GO_PROTO)

END()
25 changes: 25 additions & 0 deletions ydb/core/graph/service/log.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#if defined BLOG_D || defined BLOG_I || defined BLOG_ERROR || defined BLOG_TRACE
#error log macro definition clash
#endif

#include <util/generic/string.h>
#include <ydb/library/actors/core/log.h>

namespace NKikimr {
namespace NGraph {

TString GetLogPrefix();

}
}

#define BLOG_D(stream) ALOG_DEBUG(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_I(stream) ALOG_INFO(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_W(stream) ALOG_WARN(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_NOTICE(stream) ALOG_NOTICE(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_ERROR(stream) ALOG_ERROR(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_CRIT(stream) ALOG_CRIT(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_TRACE(stream) ALOG_TRACE(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define Y_ENSURE_LOG(cond, stream) if (!(cond)) { BLOG_ERROR("Failed condition \"" << #cond << "\" " << stream); }
Loading