Skip to content

YQ WM fix for sls feature flag #7057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ydb/core/kqp/workload_service/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ namespace NKikimr::NKqp::NWorkload {
NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);

// Fetch pool and create default pool if needed
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless);
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists);

// Fetch and create pool in scheme shard
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless);
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken);
NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl);

// Checks that database is serverless
NActors::IActor* CreateDatabaseFetcherActor(const NActors::TActorId& replyActorId, const TString& database);

// Cpu load fetcher actor
NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId);

Expand Down
118 changes: 102 additions & 16 deletions ydb/core/kqp/workload_service/actors/scheme_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ using namespace NActors;

class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
public:
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless)
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists)
: Event(std::move(event))
, EnableOnServerless(enableOnServerless)
{
if (!Event->Get()->PoolId) {
Event->Get()->PoolId = NResourcePool::DEFAULT_POOL_ID;
Expand All @@ -39,7 +38,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {

void StartPoolFetchRequest() const {
LOG_D("Start pool fetching");
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken, EnableOnServerless));
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken));
}

void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) {
Expand Down Expand Up @@ -116,20 +115,18 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {

private:
TEvPlaceRequestIntoPool::TPtr Event;
const bool EnableOnServerless;
bool CanCreatePool = false;
bool DefaultPoolCreated = false;
};


class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
public:
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: ReplyActorId(replyActorId)
, Database(database)
, PoolId(poolId)
, UserToken(userToken)
, EnableOnServerless(enableOnServerless)
{}

void DoBootstrap() {
Expand All @@ -144,11 +141,6 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
}

const auto& result = results[0];
if (!EnableOnServerless && result.DomainInfo && result.DomainInfo->IsServerless()) {
Reply(Ydb::StatusIds::UNSUPPORTED, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
return;
}

switch (result.Status) {
case EStatus::Unknown:
case EStatus::PathNotTable:
Expand Down Expand Up @@ -238,7 +230,6 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
const TString Database;
const TString PoolId;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
const bool EnableOnServerless;

NResourcePool::TPoolSettings PoolConfig;
NKikimrProto::TPathID PathId;
Expand Down Expand Up @@ -451,18 +442,113 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
TActorId SchemePipeActorId;
};


class TDatabaseFetcherActor : public TSchemeActorBase<TDatabaseFetcherActor> {
public:
TDatabaseFetcherActor(const TActorId& replyActorId, const TString& database)
: ReplyActorId(replyActorId)
, Database(database)
{}

void DoBootstrap() {
Become(&TDatabaseFetcherActor::StateFunc);
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const auto& results = ev->Get()->Request->ResultSet;
if (results.size() != 1) {
Reply(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected scheme cache response");
return;
}

const auto& result = results[0];
switch (result.Status) {
case EStatus::Unknown:
case EStatus::PathNotTable:
case EStatus::PathNotPath:
case EStatus::RedirectLookupError:
case EStatus::AccessDenied:
case EStatus::RootUnknown:
case EStatus::PathErrorUnknown:
Reply(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Database " << Database << " not found or you don't have access permissions");
return;
case EStatus::LookupError:
case EStatus::TableCreationNotComplete:
if (!ScheduleRetry(TStringBuilder() << "Retry error " << result.Status)) {
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on scheme error: " << result.Status);
}
return;
case EStatus::Ok:
Serverless = result.DomainInfo && result.DomainInfo->IsServerless();
Reply(Ydb::StatusIds::SUCCESS);
return;
}
}

STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
default:
StateFuncBase(ev);
}
}

protected:
void StartRequest() override {
LOG_D("Start database fetching");
auto event = NTableCreator::BuildSchemeCacheNavigateRequest({{}}, Database, nullptr);
event->ResultSet[0].Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(event.Release()), IEventHandle::FlagTrackDelivery);
}

void OnFatalError(Ydb::StatusIds::StatusCode status, NYql::TIssue issue) override {
Reply(status, {std::move(issue)});
}

TString LogPrefix() const override {
return TStringBuilder() << "[TDatabaseFetcherActor] ActorId: " << SelfId() << ", Database: " << Database << ", ";
}

private:
void Reply(Ydb::StatusIds::StatusCode status, const TString& message) {
Reply(status, {NYql::TIssue(message)});
}

void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
if (status == Ydb::StatusIds::SUCCESS) {
LOG_D("Database info successfully fetched");
} else {
LOG_W("Failed to fetch database info, " << status << ", issues: " << issues.ToOneLineString());
}

Issues.AddIssues(std::move(issues));
Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, std::move(Issues)));
PassAway();
}

private:
const TActorId ReplyActorId;
const TString Database;

bool Serverless = false;
};

} // anonymous namespace

IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) {
return new TPoolResolverActor(std::move(event), defaultPoolExists, enableOnServerless);
IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) {
return new TPoolResolverActor(std::move(event), defaultPoolExists);
}

IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless) {
return new TPoolFetcherActor(replyActorId, database, poolId, userToken, enableOnServerless);
IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
return new TPoolFetcherActor(replyActorId, database, poolId, userToken);
}

IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl) {
return new TPoolCreatorActor(replyActorId, database, poolId, poolConfig, userToken, diffAcl);
}

IActor* CreateDatabaseFetcherActor(const TActorId& replyActorId, const TString& database) {
return new TDatabaseFetcherActor(replyActorId, database);
}

} // NKikimr::NKqp::NWorkload
15 changes: 15 additions & 0 deletions ydb/core/kqp/workload_service/common/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ struct TEvPrivate {
EvRefreshPoolState = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvResolvePoolResponse,
EvFetchPoolResponse,
EvFetchDatabaseResponse,
EvCreatePoolResponse,
EvPrepareTablesRequest,
EvPlaceRequestIntoPoolResponse,
Expand Down Expand Up @@ -85,6 +86,20 @@ struct TEvPrivate {
const NYql::TIssues Issues;
};

struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, EvFetchDatabaseResponse> {
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, NYql::TIssues issues)
: Status(status)
, Database(database)
, Serverless(serverless)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const TString Database;
const bool Serverless;
const NYql::TIssues Issues;
};

struct TEvCreatePoolResponse : public NActors::TEventLocal<TEvCreatePoolResponse, EvCreatePoolResponse> {
TEvCreatePoolResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: Status(status)
Expand Down
60 changes: 41 additions & 19 deletions ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@ using namespace NActors;


class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
struct TCounters {
const NMonitoring::TDynamicCounterPtr Counters;

NMonitoring::TDynamicCounters::TCounterPtr ActivePools;

TCounters(NMonitoring::TDynamicCounterPtr counters)
: Counters(counters)
{
Register();
}

private:
void Register() {
ActivePools = Counters->GetCounter("ActivePools", false);
}
};

enum class ETablesCreationStatus {
Cleanup,
NotStarted,
Expand All @@ -43,9 +60,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
public:
explicit TKqpWorkloadService(NMonitoring::TDynamicCounterPtr counters)
: Counters(counters)
{
RegisterCounters();
}
{}

void Bootstrap() {
Become(&TKqpWorkloadService::MainState);
Expand All @@ -55,7 +70,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
(ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem
}), IEventHandle::FlagTrackDelivery);

CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(ActorContext(), Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));
CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(ActorContext(), Counters.Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));

EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools();
EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless();
Expand Down Expand Up @@ -132,9 +147,9 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
return;
}

LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database));
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless));
const TString& database = ev->Get()->Database;
LOG_D("Recieved new request from " << workerActorId << ", Database: " << database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
GetOrCreateDatabaseState(database)->DoPlaceRequest(std::move(ev));
}

void Handle(TEvCleanupRequest::TPtr& ev) {
Expand Down Expand Up @@ -177,6 +192,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
hFunc(TEvCleanupRequest, Handle);
hFunc(TEvents::TEvWakeup, Handle);

hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle);
hFunc(TEvPrivate::TEvResolvePoolResponse, Handle);
hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle);
hFunc(TEvPrivate::TEvNodesInfoRequest, Handle);
Expand All @@ -191,11 +207,15 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
)

private:
void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
GetOrCreateDatabaseState(ev->Get()->Database)->UpdateDatabaseInfo(ev);
}

void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) {
const auto& event = ev->Get()->Event;
const TString& database = event->Get()->Database;
if (ev->Get()->DefaultPoolCreated) {
DatabasesWithDefaultPool.insert(CanonizePath(database));
GetOrCreateDatabaseState(database)->HasDefaultPool = true;
}

const TString& poolId = event->Get()->PoolId;
Expand All @@ -211,10 +231,10 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
TString poolKey = GetPoolKey(database, poolId);
LOG_I("Creating new handler for pool " << poolKey);

auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, ev->Get()->PoolConfig, Counters));
auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, ev->Get()->PoolConfig, Counters.Counters));
poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second;

ActivePools->Inc();
Counters.ActivePools->Inc();
ScheduleIdleCheck();
}

Expand Down Expand Up @@ -409,7 +429,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
}
for (const auto& poolKey : poolsToDelete) {
PoolIdToState.erase(poolKey);
ActivePools->Dec();
Counters.ActivePools->Dec();
}

if (!PoolIdToState.empty()) {
Expand Down Expand Up @@ -472,6 +492,14 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)}));
}

TDatabaseState* GetOrCreateDatabaseState(const TString& database) {
auto databaseIt = DatabaseToState.find(database);
if (databaseIt != DatabaseToState.end()) {
return &databaseIt->second;
}
return &DatabaseToState.insert({database, TDatabaseState{.ActorContext = ActorContext(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second;
}

TPoolState* GetPoolState(const TString& database, const TString& poolId) {
return GetPoolState(GetPoolKey(database, poolId));
}
Expand All @@ -492,12 +520,8 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
return "[Service] ";
}

void RegisterCounters() {
ActivePools = Counters->GetCounter("ActivePools", false);
}

private:
NMonitoring::TDynamicCounterPtr Counters;
TCounters Counters;

bool EnabledResourcePools = false;
bool EnabledResourcePoolsOnServerless = false;
Expand All @@ -506,12 +530,10 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup;
std::unordered_set<TString> PendingHandlers;

std::unordered_set<TString> DatabasesWithDefaultPool;
std::unordered_map<TString, TDatabaseState> DatabaseToState;
std::unordered_map<TString, TPoolState> PoolIdToState;
std::unique_ptr<TCpuQuotaManagerState> CpuQuotaManager;
ui32 NodeCount = 0;

NMonitoring::TDynamicCounters::TCounterPtr ActivePools;
};

} // anonymous namespace
Expand Down
Loading
Loading