diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 3e4c6859913f..5471bef3c1be 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include diff --git a/ydb/core/kqp/ut/scheme/ya.make b/ydb/core/kqp/ut/scheme/ya.make index 4cd03e32b9d9..f1ab58f089eb 100644 --- a/ydb/core/kqp/ut/scheme/ya.make +++ b/ydb/core/kqp/ut/scheme/ya.make @@ -22,6 +22,7 @@ PEERDIR( library/cpp/threading/local_executor ydb/core/kqp ydb/core/kqp/ut/common + ydb/core/kqp/workload_service/ut/common ydb/core/tx/columnshard/hooks/testing ydb/library/yql/sql/pg ydb/library/yql/parser/pg_wrapper diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index b624c57bdd94..15d525a67630 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -230,6 +230,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { TAppConfig GetAppConfig() const { TAppConfig appConfig; appConfig.MutableFeatureFlags()->SetEnableResourcePools(Settings_.EnableResourcePools_); + appConfig.MutableFeatureFlags()->SetEnableMetadataObjectsOnServerless(Settings_.EnableMetadataObjectsOnServerless_); return appConfig; } @@ -237,7 +238,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { void SetLoggerSettings(TServerSettings& serverSettings) const { auto loggerInitializer = [](TTestActorRuntime& runtime) { runtime.SetLogPriority(NKikimrServices::KQP_WORKLOAD_SERVICE, NLog::EPriority::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::EPriority::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::EPriority::PRI_TRACE); }; serverSettings.SetLoggerInitializer(loggerInitializer); @@ -254,16 +255,50 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { .SetAppConfig(appConfig) .SetFeatureFlags(appConfig.GetFeatureFlags()); + if (Settings_.CreateSampleTenants_) { + serverSettings + .SetDynamicNodeCount(2) + .AddStoragePoolType(Settings_.GetDedicatedTenantName()) + .AddStoragePoolType(Settings_.GetSharedTenantName()); + } + SetLoggerSettings(serverSettings); return serverSettings; } + void SetupResourcesTenant(Ydb::Cms::CreateDatabaseRequest& request, Ydb::Cms::StorageUnits* storage, const TString& name) { + request.set_path(name); + storage->set_unit_kind(name); + storage->set_count(1); + } + + void CreateTenants() { + { // Dedicated + Ydb::Cms::CreateDatabaseRequest request; + SetupResourcesTenant(request, request.mutable_resources()->add_storage_units(), Settings_.GetDedicatedTenantName()); + Tenants_->CreateTenant(std::move(request)); + } + + { // Shared + Ydb::Cms::CreateDatabaseRequest request; + SetupResourcesTenant(request, request.mutable_shared_resources()->add_storage_units(), Settings_.GetSharedTenantName()); + Tenants_->CreateTenant(std::move(request)); + } + + { // Serverless + Ydb::Cms::CreateDatabaseRequest request; + request.set_path(Settings_.GetServerlessTenantName()); + request.mutable_serverless_resources()->set_shared_database_path(Settings_.GetSharedTenantName()); + Tenants_->CreateTenant(std::move(request)); + } + } + void InitializeServer() { ui32 grpcPort = PortManager_.GetPort(); TServerSettings serverSettings = GetServerSettings(grpcPort); - Server_ = std::make_unique(serverSettings); + Server_ = MakeIntrusive(serverSettings); Server_->EnableGRpc(grpcPort); GetRuntime()->SetDispatchTimeout(FUTURE_WAIT_TIMEOUT); @@ -276,10 +311,15 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { TableClient_ = std::make_unique(*YdbDriver_, NYdb::NTable::TClientSettings().AuthToken("user@" BUILTIN_SYSTEM_DOMAIN)); TableClientSession_ = std::make_unique(TableClient_->CreateSession().GetValueSync().GetSession()); + + Tenants_ = std::make_unique(Server_); + if (Settings_.CreateSampleTenants_) { + CreateTenants(); + } } void CreateSamplePool() const { - if (!Settings_.EnableResourcePools_) { + if (!Settings_.EnableResourcePools_ || Settings_.CreateSampleTenants_) { return; } @@ -529,9 +569,10 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { const TYdbSetupSettings Settings_; TPortManager PortManager_; - std::unique_ptr Server_; + TServer::TPtr Server_; std::unique_ptr Client_; std::unique_ptr YdbDriver_; + std::unique_ptr Tenants_; std::unique_ptr TableClient_; std::unique_ptr TableClientSession_; @@ -580,6 +621,18 @@ TIntrusivePtr TYdbSetupSettings::Create() const { return MakeIntrusive(*this); } +TString TYdbSetupSettings::GetDedicatedTenantName() const { + return TStringBuilder() << CanonizePath(DomainName_) << "/test-dedicated"; +} + +TString TYdbSetupSettings::GetSharedTenantName() const { + return TStringBuilder() << CanonizePath(DomainName_) << "/test-shared"; +} + +TString TYdbSetupSettings::GetServerlessTenantName() const { + return TStringBuilder() << CanonizePath(DomainName_) << "/test-serverless"; +} + //// IYdbSetup void IYdbSetup::WaitFor(TDuration timeout, TString description, std::function callback) { diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h index 35f1a1693140..24f56c3214bc 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h @@ -26,6 +26,7 @@ struct TQueryRunnerSettings { FLUENT_SETTING_DEFAULT(ui32, NodeIndex, 0); FLUENT_SETTING_DEFAULT(TString, PoolId, ""); FLUENT_SETTING_DEFAULT(TString, UserSID, "user@" BUILTIN_SYSTEM_DOMAIN); + FLUENT_SETTING_DEFAULT(TString, Database, ""); // Runner settings FLUENT_SETTING_DEFAULT(bool, HangUpDuringExecution, false); @@ -66,7 +67,9 @@ struct TYdbSetupSettings { // Cluster settings FLUENT_SETTING_DEFAULT(ui32, NodeCount, 1); FLUENT_SETTING_DEFAULT(TString, DomainName, "Root"); + FLUENT_SETTING_DEFAULT(bool, CreateSampleTenants, false); FLUENT_SETTING_DEFAULT(bool, EnableResourcePools, true); + FLUENT_SETTING_DEFAULT(bool, EnableMetadataObjectsOnServerless, true); // Default pool settings FLUENT_SETTING_DEFAULT(TString, PoolId, "sample_pool_id"); @@ -77,6 +80,10 @@ struct TYdbSetupSettings { FLUENT_SETTING_DEFAULT(double, DatabaseLoadCpuThreshold, -1); TIntrusivePtr Create() const; + + TString GetDedicatedTenantName() const; + TString GetSharedTenantName() const; + TString GetServerlessTenantName() const; }; class IYdbSetup : public TThrRefBase { @@ -132,6 +139,12 @@ struct TSampleQueries { UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Request timeout exceeded, cancelling after"); } + template + static void CheckNotFound(const TResult& result, const TString& poolId) { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found or you don't have access permissions"); + } + struct TSelect42 { static constexpr char Query[] = "SELECT 42;"; diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index f8e1e679e1a5..c93d08ab39a5 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -149,4 +149,5 @@ message TFeatureFlags { optional bool EnableResourcePoolsCounters = 135 [default = false]; optional bool EnableOptionalColumnsInColumnShard = 136 [default = false]; optional bool EnablePgSyntax = 139 [default = false]; + optional bool EnableMetadataObjectsOnServerless = 141 [default = true]; } diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 094365c01b41..333cc96d597e 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -63,6 +63,7 @@ class TTestFeatureFlagsHolder { FEATURE_FLAG_SETTER(EnableResourcePools) FEATURE_FLAG_SETTER(EnableChangefeedsOnIndexTables) FEATURE_FLAG_SETTER(EnablePgSyntax) + FEATURE_FLAG_SETTER(EnableMetadataObjectsOnServerless) #undef FEATURE_FLAG_SETTER }; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index e42524caa38f..8c8d35d0228e 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -2694,6 +2694,50 @@ namespace Tests { return Server->DynamicNodes(); } + void TTenants::CreateTenant(Ydb::Cms::CreateDatabaseRequest request, ui32 nodes, TDuration timeout) { + const TString path = request.path(); + const bool serverless = request.has_serverless_resources(); + + // Create new tenant + auto& runtime = *Server->GetRuntime(); + const auto result = NKikimr::NRpcService::DoLocalRpc>( + std::move(request), "", "", runtime.GetActorSystem(0), true + ).ExtractValueSync(); + + if (result.operation().status() != Ydb::StatusIds::SUCCESS) { + NYql::TIssues issues; + NYql::IssuesFromMessage(result.operation().issues(), issues); + ythrow yexception() << "Failed to create tenant " << path << ", " << result.operation().status() << ", reason:\n" << issues.ToString(); + } + + // Run new tenant + if (!serverless) { + Run(path, nodes); + } + + // Wait tenant is up + Ydb::Cms::GetDatabaseStatusResult getTenantResult; + const TActorId edgeActor = runtime.AllocateEdgeActor(); + const TInstant start = TInstant::Now(); + while (TInstant::Now() - start <= timeout) { + auto getTenantRequest = std::make_unique(); + getTenantRequest->Record.MutableRequest()->set_path(path); + runtime.SendToPipe(MakeConsoleID(), edgeActor, getTenantRequest.release(), 0, GetPipeConfigWithRetries()); + + auto response = runtime.GrabEdgeEvent(edgeActor, timeout); + if (!response) { + ythrow yexception() << "Waiting CMS get tenant response timeout. Last tenant description:\n" << getTenantResult.DebugString(); + } + response->Get()->Record.GetResponse().operation().result().UnpackTo(&getTenantResult); + if (getTenantResult.state() == Ydb::Cms::GetDatabaseStatusResult::RUNNING) { + return; + } + + Sleep(TDuration::MilliSeconds(100)); + } + ythrow yexception() << "Waiting tenant status RUNNING timeout. Spent time " << TInstant::Now() - start << " exceeds limit " << timeout << ". Last tenant description:\n" << getTenantResult.DebugString(); + } + TVector &TTenants::Nodes(const TString &name) { return Tenants[name]; } diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 795491279c1d..6723164f58f1 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -630,6 +630,8 @@ namespace Tests { ui32 Availabe() const; ui32 Capacity() const; + void CreateTenant(Ydb::Cms::CreateDatabaseRequest request, ui32 nodes = 1, TDuration timeout = TDuration::Seconds(30)); + private: TVector& Nodes(const TString &name); void StopNode(const TString /*name*/, ui32 nodeIdx); diff --git a/ydb/services/metadata/manager/alter_impl.h b/ydb/services/metadata/manager/alter_impl.h index 69fe03a72de3..9d8f984ec41b 100644 --- a/ydb/services/metadata/manager/alter_impl.h +++ b/ydb/services/metadata/manager/alter_impl.h @@ -1,5 +1,6 @@ #pragma once #include "abstract.h" +#include "fetch_database.h" #include "modification_controller.h" #include "preparation_controller.h" #include "restore.h" @@ -111,6 +112,7 @@ class TModificationActorImpl: public NActors::TActorBootstrappedFeatureFlags.GetEnableMetadataObjectsOnServerless() && Context.GetActivityType() != IOperationsManager::EActivityType::Drop) { + TBase::Register(CreateDatabaseFetcherActor(Context.GetExternalData().GetDatabase())); + } else { + CreateSession(); + } + } + + void Handle(TEvFetchDatabaseResponse::TPtr& ev) { + TString errorMessage; + if (const auto& errorString = ev->Get()->GetErrorString()) { + errorMessage = TStringBuilder() << "Cannot fetch database '" << Context.GetExternalData().GetDatabase() << "': " << *errorString; + } else if (ev->Get()->GetServerless()) { + errorMessage = TStringBuilder() << "Objects " << TObject::GetTypeId() << " are disabled for serverless domains. Please contact your system administrator to enable it"; + } + + if (errorMessage) { + auto g = TBase::PassAwayGuard(); + ExternalController->OnAlteringProblem(errorMessage); + } else { + CreateSession(); + } + } + + void CreateSession() const { TBase::Register(new NRequest::TYDBCallbackRequest( NRequest::TDialogCreateSession::TRequest(), UserToken, TBase::SelfId())); } diff --git a/ydb/services/metadata/manager/common.h b/ydb/services/metadata/manager/common.h index 3b6a3a21b836..5bf79e690564 100644 --- a/ydb/services/metadata/manager/common.h +++ b/ydb/services/metadata/manager/common.h @@ -35,6 +35,7 @@ enum EEvents { EvAlterProblem, EvAlterPreparationFinished, EvAlterPreparationProblem, + EvFetchDatabaseResponse, EvEnd }; static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_MANAGER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_MANAGER)"); diff --git a/ydb/services/metadata/manager/fetch_database.cpp b/ydb/services/metadata/manager/fetch_database.cpp new file mode 100644 index 000000000000..cb7ebfcc52f5 --- /dev/null +++ b/ydb/services/metadata/manager/fetch_database.cpp @@ -0,0 +1,112 @@ +#include "fetch_database.h" + +#include + +#include + +#include + + +namespace NKikimr::NMetadata::NModifications { + +namespace { + +class TDatabaseFetcherActor : public TActorBootstrapped { + using TBase = TActorBootstrapped; + using TRetryPolicy = IRetryPolicy<>; + +public: + explicit TDatabaseFetcherActor(const TString& database) + : Database(database) + {} + + void Registered(TActorSystem* sys, const TActorId& owner) override { + TBase::Registered(sys, owner); + Owner = owner; + } + + void Bootstrap() { + StartRequest(); + Become(&TDatabaseFetcherActor::StateFunc); + } + + void Handle(TEvents::TEvUndelivered::TPtr& ev) { + if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown && ScheduleRetry()) { + return; + } + + Reply("Scheme cache is unavailable"); + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + const auto& results = ev->Get()->Request->ResultSet; + Y_ABORT_UNLESS(results.size() == 1); + + const auto& result = results[0]; + if (result.DomainInfo) { + Serverless = result.DomainInfo->IsServerless(); + Reply(); + return; + } + + if (result.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError && ScheduleRetry()) { + return; + } + + Reply(TStringBuilder() << "Failed to fetch database info: " << result.Status); + } + + STRICT_STFUNC(StateFunc, + sFunc(TEvents::TEvWakeup, StartRequest); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + ) + +private: + void StartRequest() { + auto event = NTableCreator::BuildSchemeCacheNavigateRequest( + {{}}, + Database ? Database : AppData()->TenantName, + MakeIntrusive(BUILTIN_ACL_METADATA, TVector{}) + ); + event->ResultSet[0].Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(event.Release()), IEventHandle::FlagTrackDelivery); + } + + bool ScheduleRetry() { + if (!RetryState) { + RetryState = TRetryPolicy::GetFixedIntervalPolicy( + [](){return ERetryErrorClass::ShortRetry;} + , TDuration::MilliSeconds(100) + , TDuration::MilliSeconds(500) + , 100 + )->CreateRetryState();; + } + + if (const auto delay = RetryState->GetNextRetryDelay()) { + this->Schedule(*delay, new TEvents::TEvWakeup()); + return true; + } + + return false; + } + + void Reply(const std::optional& errorMessage = std::nullopt) { + Send(Owner, new TEvFetchDatabaseResponse(Serverless, errorMessage)); + PassAway(); + } + +private: + const TString Database; + TActorId Owner; + TRetryPolicy::IRetryState::TPtr RetryState; + bool Serverless = false; +}; + +} // anonymous namespace + +IActor* CreateDatabaseFetcherActor(const TString& database) { + return new TDatabaseFetcherActor(database); +} + +} // NKikimr::NMetadata::NModifications diff --git a/ydb/services/metadata/manager/fetch_database.h b/ydb/services/metadata/manager/fetch_database.h new file mode 100644 index 000000000000..819fd5d4503f --- /dev/null +++ b/ydb/services/metadata/manager/fetch_database.h @@ -0,0 +1,24 @@ +#pragma once + +#include "common.h" + +#include + + +namespace NKikimr::NMetadata::NModifications { + +class TEvFetchDatabaseResponse : public TEventLocal { +private: + YDB_READONLY_DEF(bool, Serverless); + YDB_READONLY_DEF(std::optional, ErrorString); + +public: + TEvFetchDatabaseResponse(bool serverless, const std::optional& errorString) + : Serverless(serverless) + , ErrorString(errorString) + {} +}; + +IActor* CreateDatabaseFetcherActor(const TString& database); + +} // NKikimr::NMetadata::NModifications diff --git a/ydb/services/metadata/manager/ya.make b/ydb/services/metadata/manager/ya.make index 2c29f61e9601..79beeb811c81 100644 --- a/ydb/services/metadata/manager/ya.make +++ b/ydb/services/metadata/manager/ya.make @@ -14,11 +14,13 @@ SRCS( ydb_value_operator.cpp modification_controller.cpp object.cpp + fetch_database.cpp ) PEERDIR( ydb/library/accessor ydb/library/actors/core + ydb/library/table_creator ydb/public/api/protos ydb/core/protos ydb/services/bg_tasks/abstract