diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index aa5028518bed..2892f0f97df3 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -197,6 +197,36 @@ class TKqpSchemeExecuter : public TActorBootstrapped { break; } + case NKqpProto::TKqpSchemeOperation::kCreateColumnTable: { + const auto& modifyScheme = schemeOp.GetCreateColumnTable(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + + case NKqpProto::TKqpSchemeOperation::kAlterColumnTable: { + const auto& modifyScheme = schemeOp.GetAlterColumnTable(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + + case NKqpProto::TKqpSchemeOperation::kCreateTableStore: { + const auto& modifyScheme = schemeOp.GetCreateTableStore(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + + case NKqpProto::TKqpSchemeOperation::kAlterTableStore: { + const auto& modifyScheme = schemeOp.GetAlterTableStore(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + + case NKqpProto::TKqpSchemeOperation::kDropTableStore: { + const auto& modifyScheme = schemeOp.GetDropTableStore(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + default: InternalError(TStringBuilder() << "Unexpected scheme operation: " << (ui32) schemeOp.GetOperationCase()); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 72b65ebeacd9..763470536966 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -877,51 +877,12 @@ class TKikimrIcGateway : public IKqpGateway { return tablePromise.GetFuture(); } - TFuture CreateColumnTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir) override { - using TRequest = TEvTxUserProxy::TEvProposeTransaction; - - try { - if (!CheckCluster(metadata->Cluster)) { - return InvalidCluster(metadata->Cluster); - } - - std::pair pathPair; - { - TString error; - if (!GetPathPair(metadata->Name, pathPair, error, createDir)) { - return MakeFuture(ResultFromError(error)); - } - } - - auto ev = MakeHolder(); - ev->Record.SetDatabaseName(Database); - if (UserToken) { - ev->Record.SetUserToken(UserToken->GetSerializedToken()); - } - auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); - schemeTx.SetWorkingDir(pathPair.first); - - Ydb::StatusIds::StatusCode code; - TString error; - - schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnTable); - NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable(); - - tableDesc->SetName(pathPair.second); - FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata); - - if (!FillCreateColumnTableDesc(metadata, *tableDesc, code, error)) { - IKqpGateway::TGenericResult errResult; - errResult.AddIssue(NYql::TIssue(error)); - errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code)); - return MakeFuture(std::move(errResult)); - } - - return SendSchemeRequest(ev.Release()); - } - catch (yexception& e) { - return MakeFuture(ResultFromException(e)); - } + TFuture CreateColumnTable(NYql::TKikimrTableMetadataPtr metadata, + bool createDir, bool existingOk) override { + Y_UNUSED(metadata); + Y_UNUSED(createDir); + Y_UNUSED(existingOk); + return NotImplemented(); } TFuture AlterTable(const TString&, Ydb::Table::AlterTableRequest&&, const TMaybe&, ui64, NKikimrIndexBuilder::TIndexBuildSettings&&) override @@ -1035,150 +996,33 @@ class TKikimrIcGateway : public IKqpGateway { TFuture AlterColumnTable(const TString& cluster, const NYql::TAlterColumnTableSettings& settings) override { - using TRequest = TEvTxUserProxy::TEvProposeTransaction; - - try { - if (!CheckCluster(cluster)) { - return InvalidCluster(cluster); - } - - std::pair pathPair; - { - TString error; - if (!GetPathPair(settings.Table, pathPair, error, false)) { - return MakeFuture(ResultFromError(error)); - } - } - - auto ev = MakeHolder(); - ev->Record.SetDatabaseName(Database); - if (UserToken) { - ev->Record.SetUserToken(UserToken->GetSerializedToken()); - } - auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); - schemeTx.SetWorkingDir(pathPair.first); - - schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable); - NKikimrSchemeOp::TAlterColumnTable* alter = schemeTx.MutableAlterColumnTable(); - alter->SetName(settings.Table); - - return SendSchemeRequest(ev.Release()); - } - catch (yexception& e) { - return MakeFuture(ResultFromException(e)); - } + Y_UNUSED(cluster); + Y_UNUSED(settings); + return NotImplemented(); } TFuture CreateTableStore(const TString& cluster, - const NYql::TCreateTableStoreSettings& settings) override { - using TRequest = TEvTxUserProxy::TEvProposeTransaction; - - try { - if (!CheckCluster(cluster)) { - return InvalidCluster(cluster); - } - - std::pair pathPair; - { - TString error; - if (!GetPathPair(settings.TableStore, pathPair, error, false)) { - return MakeFuture(ResultFromError(error)); - } - } - - auto ev = MakeHolder(); - ev->Record.SetDatabaseName(Database); - if (UserToken) { - ev->Record.SetUserToken(UserToken->GetSerializedToken()); - } - auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); - schemeTx.SetWorkingDir(pathPair.first); - - schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnStore); - NKikimrSchemeOp::TColumnStoreDescription* storeDesc = schemeTx.MutableCreateColumnStore(); - storeDesc->SetName(pathPair.second); - storeDesc->SetColumnShardCount(settings.ShardsCount); - - NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets(); - schemaPreset->SetName("default"); - FillColumnTableSchema(*schemaPreset->MutableSchema(), settings); - - return SendSchemeRequest(ev.Release()); - } - catch (yexception& e) { - return MakeFuture(ResultFromException(e)); - } + const NYql::TCreateTableStoreSettings& settings, + bool existingOk) override { + Y_UNUSED(cluster); + Y_UNUSED(settings); + Y_UNUSED(existingOk); + return NotImplemented(); } TFuture AlterTableStore(const TString& cluster, const NYql::TAlterTableStoreSettings& settings) override { - using TRequest = TEvTxUserProxy::TEvProposeTransaction; - - try { - if (!CheckCluster(cluster)) { - return InvalidCluster(cluster); - } - - std::pair pathPair; - { - TString error; - if (!GetPathPair(settings.TableStore, pathPair, error, false)) { - return MakeFuture(ResultFromError(error)); - } - } - - auto ev = MakeHolder(); - ev->Record.SetDatabaseName(Database); - if (UserToken) { - ev->Record.SetUserToken(UserToken->GetSerializedToken()); - } - auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); - schemeTx.SetWorkingDir(pathPair.first); - - schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnStore); - NKikimrSchemeOp::TAlterColumnStore* alter = schemeTx.MutableAlterColumnStore(); - alter->SetName(pathPair.second); - - return SendSchemeRequest(ev.Release()); - } - catch (yexception& e) { - return MakeFuture(ResultFromException(e)); - } + Y_UNUSED(cluster); + Y_UNUSED(settings); + return NotImplemented(); } TFuture DropTableStore(const TString& cluster, - const NYql::TDropTableStoreSettings& settings) override { - using TRequest = TEvTxUserProxy::TEvProposeTransaction; - - try { - if (!CheckCluster(cluster)) { - return InvalidCluster(cluster); - } - - std::pair pathPair; - { - TString error; - if (!GetPathPair(settings.TableStore, pathPair, error, false)) { - return MakeFuture(ResultFromError(error)); - } - } - - auto ev = MakeHolder(); - ev->Record.SetDatabaseName(Database); - if (UserToken) { - ev->Record.SetUserToken(UserToken->GetSerializedToken()); - } - auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); - schemeTx.SetWorkingDir(pathPair.first); - - schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropColumnStore); - NKikimrSchemeOp::TDrop* drop = schemeTx.MutableDrop(); - drop->SetName(pathPair.second); - return SendSchemeRequest(ev.Release()); - } - catch (yexception& e) { - return MakeFuture(ResultFromException(e)); - } + const NYql::TDropTableStoreSettings& settings, bool missingOk) override { + Y_UNUSED(cluster); + Y_UNUSED(settings); + Y_UNUSED(missingOk); + return NotImplemented(); } TFuture CreateExternalTable(const TString& cluster, @@ -2328,27 +2172,6 @@ class TKikimrIcGateway : public IKqpGateway { return result; } - template - static void FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& metadata) - { - Y_ENSURE(metadata.ColumnOrder.size() == metadata.Columns.size()); - for (const auto& name : metadata.ColumnOrder) { - auto columnIt = metadata.Columns.find(name); - Y_ENSURE(columnIt != metadata.Columns.end()); - - TOlapColumnDescription& columnDesc = *schema.AddColumns(); - columnDesc.SetName(columnIt->second.Name); - columnDesc.SetType(columnIt->second.Type); - columnDesc.SetNotNull(columnIt->second.NotNull); - } - - for (const auto& keyColumn : metadata.KeyColumnNames) { - schema.AddKeyColumnNames(keyColumn); - } - - schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES); - } - static void FillParameters(TQueryData::TPtr params, ::google::protobuf::Map, Ydb::TypedValue>* output) { if (!params) { return; @@ -2358,59 +2181,6 @@ class TKikimrIcGateway : public IKqpGateway { output->insert(paramsMap.begin(), paramsMap.end()); } - static bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata, - NKikimrSchemeOp::TColumnTableDescription& tableDesc, Ydb::StatusIds::StatusCode& code, TString& error) - { - if (metadata->Columns.empty()) { - tableDesc.SetSchemaPresetName("default"); - } - - auto& hashSharding = *tableDesc.MutableSharding()->MutableHashSharding(); - - for (const TString& column : metadata->TableSettings.PartitionBy) { - if (!metadata->Columns.count(column)) { - code = Ydb::StatusIds::BAD_REQUEST; - error = TStringBuilder() << "Unknown column '" << column << "' in partition by key"; - return false; - } - - hashSharding.AddColumns(column); - } - - if (metadata->TableSettings.PartitionByHashFunction) { - if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "cloud_logs") { - hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS); - } else if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "consistency_hash_64") { - hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CONSISTENCY_64); - } else if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "modulo_n") { - hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N); - } else { - code = Ydb::StatusIds::BAD_REQUEST; - error = TStringBuilder() << "Unknown hash function '" - << metadata->TableSettings.PartitionByHashFunction.GetRef() << "' to partition by"; - return false; - } - } else { - hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CONSISTENCY_64); - } - - if (metadata->TableSettings.MinPartitions) { - tableDesc.SetColumnShardCount(*metadata->TableSettings.MinPartitions); - } - - if (metadata->TableSettings.TtlSettings.Defined() && metadata->TableSettings.TtlSettings.IsSet()) { - const auto& inputSettings = metadata->TableSettings.TtlSettings.GetValueSet(); - auto& resultSettings = *tableDesc.MutableTtlSettings(); - resultSettings.MutableEnabled()->SetColumnName(inputSettings.ColumnName); - resultSettings.MutableEnabled()->SetExpireAfterSeconds(inputSettings.ExpireAfter.Seconds()); - if (inputSettings.ColumnUnit) { - resultSettings.MutableEnabled()->SetColumnUnit(static_cast(*inputSettings.ColumnUnit)); - } - } - - return true; - } - private: TString Cluster; const NKikimrKqp::EQueryType QueryType; diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index c153f18b27a1..c545e7b62e23 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -354,6 +354,80 @@ bool FillCreateTableDesc(NYql::TKikimrTableMetadataPtr metadata, NKikimrSchemeOp return true; } +template +void FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& metadata) +{ + Y_ENSURE(metadata.ColumnOrder.size() == metadata.Columns.size()); + for (const auto& name : metadata.ColumnOrder) { + auto columnIt = metadata.Columns.find(name); + Y_ENSURE(columnIt != metadata.Columns.end()); + + NKikimrSchemeOp::TOlapColumnDescription& columnDesc = *schema.AddColumns(); + columnDesc.SetName(columnIt->second.Name); + columnDesc.SetType(columnIt->second.Type); + columnDesc.SetNotNull(columnIt->second.NotNull); + } + + for (const auto& keyColumn : metadata.KeyColumnNames) { + schema.AddKeyColumnNames(keyColumn); + } + + schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES); +} + +bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata, + NKikimrSchemeOp::TColumnTableDescription& tableDesc, Ydb::StatusIds::StatusCode& code, TString& error) +{ + if (metadata->Columns.empty()) { + tableDesc.SetSchemaPresetName("default"); + } + + auto& hashSharding = *tableDesc.MutableSharding()->MutableHashSharding(); + + for (const TString& column : metadata->TableSettings.PartitionBy) { + if (!metadata->Columns.count(column)) { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Unknown column '" << column << "' in partition by key"; + return false; + } + + hashSharding.AddColumns(column); + } + + if (metadata->TableSettings.PartitionByHashFunction) { + if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "cloud_logs") { + hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS); + } else if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "consistency_hash_64") { + hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CONSISTENCY_64); + } else if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "modulo_n") { + hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N); + } else { + code = Ydb::StatusIds::BAD_REQUEST; + error = TStringBuilder() << "Unknown hash function '" + << metadata->TableSettings.PartitionByHashFunction.GetRef() << "' to partition by"; + return false; + } + } else { + hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CONSISTENCY_64); + } + + if (metadata->TableSettings.MinPartitions) { + tableDesc.SetColumnShardCount(*metadata->TableSettings.MinPartitions); + } + + if (metadata->TableSettings.TtlSettings.Defined() && metadata->TableSettings.TtlSettings.IsSet()) { + const auto& inputSettings = metadata->TableSettings.TtlSettings.GetValueSet(); + auto& resultSettings = *tableDesc.MutableTtlSettings(); + resultSettings.MutableEnabled()->SetColumnName(inputSettings.ColumnName); + resultSettings.MutableEnabled()->SetExpireAfterSeconds(inputSettings.ExpireAfter.Seconds()); + if (inputSettings.ColumnUnit) { + resultSettings.MutableEnabled()->SetColumnUnit(static_cast(*inputSettings.ColumnUnit)); + } + } + + return true; +} + template static TFuture PrepareUnsupported(const char* name) { TResult result; @@ -1215,32 +1289,241 @@ class TKqpGatewayProxy : public IKikimrGateway { } } - TFuture CreateColumnTable(TKikimrTableMetadataPtr metadata, bool createDir) override { - FORWARD_ENSURE_NO_PREPARE(CreateColumnTable, metadata, createDir); + TFuture CreateColumnTable(TKikimrTableMetadataPtr metadata, + bool createDir, bool existingOk) override { + CHECK_PREPARED_DDL(CreateColumnTable); + + try { + const auto& cluster = metadata->Cluster; + + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError("Invalid cluster: " + cluster)); + } + + std::pair pathPair; + { + TString error; + if (!NSchemeHelpers::SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, createDir)) { + return MakeFuture(ResultFromError(error)); + } + } + + NKikimrSchemeOp::TModifyScheme schemeTx; + schemeTx.SetWorkingDir(pathPair.first); + + Ydb::StatusIds::StatusCode code; + TString error; + + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnTable); + schemeTx.SetFailedOnAlreadyExists(!existingOk); + + NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable(); + + tableDesc->SetName(pathPair.second); + FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata); + + if (!FillCreateColumnTableDesc(metadata, *tableDesc, code, error)) { + IKqpGateway::TGenericResult errResult; + errResult.AddIssue(NYql::TIssue(error)); + errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code)); + return MakeFuture(std::move(errResult)); + } + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableCreateTable()->Swap(&schemeTx); + + TGenericResult result; + result.SetSuccess(); + return MakeFuture(result); + } else { + return Gateway->ModifyScheme(std::move(schemeTx)); + } + } + catch (yexception& e) { + return MakeFuture(ResultFromException(e)); + } } TFuture AlterColumnTable(const TString& cluster, const TAlterColumnTableSettings& settings) override { - FORWARD_ENSURE_NO_PREPARE(AlterColumnTable, cluster, settings); + CHECK_PREPARED_DDL(AlterColumnTable); + + try { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError("Invalid cluster: " + cluster)); + } + + std::pair pathPair; + { + TString error; + if (!NSchemeHelpers::SplitTablePath(settings.Table, GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError(error)); + } + } + + NKikimrSchemeOp::TModifyScheme schemeTx; + schemeTx.SetWorkingDir(pathPair.first); + + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable); + NKikimrSchemeOp::TAlterColumnTable* alter = schemeTx.MutableAlterColumnTable(); + alter->SetName(settings.Table); + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableAlterColumnTable()->Swap(&schemeTx); + + TGenericResult result; + result.SetSuccess(); + return MakeFuture(result); + } else { + return Gateway->ModifyScheme(std::move(schemeTx)); + } + } + catch (yexception& e) { + return MakeFuture(ResultFromException(e)); + } } TFuture CreateTableStore(const TString& cluster, - const TCreateTableStoreSettings& settings) override + const TCreateTableStoreSettings& settings, bool existingOk) override { - FORWARD_ENSURE_NO_PREPARE(CreateTableStore, cluster, settings); + CHECK_PREPARED_DDL(CreateTableStore); + + try { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError("Invalid cluster: " + cluster)); + } + + std::pair pathPair; + { + TString error; + if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError(error)); + } + } + + NKikimrSchemeOp::TModifyScheme schemeTx; + schemeTx.SetWorkingDir(pathPair.first); + + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnStore); + schemeTx.SetFailedOnAlreadyExists(!existingOk); + + NKikimrSchemeOp::TColumnStoreDescription* storeDesc = schemeTx.MutableCreateColumnStore(); + storeDesc->SetName(pathPair.second); + storeDesc->SetColumnShardCount(settings.ShardsCount); + + NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets(); + schemaPreset->SetName("default"); + FillColumnTableSchema(*schemaPreset->MutableSchema(), settings); + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableCreateTableStore()->Swap(&schemeTx); + + TGenericResult result; + result.SetSuccess(); + return MakeFuture(result); + } else { + return Gateway->ModifyScheme(std::move(schemeTx)); + } + } + catch (yexception& e) { + return MakeFuture(ResultFromException(e)); + } } TFuture AlterTableStore(const TString& cluster, const TAlterTableStoreSettings& settings) override { - FORWARD_ENSURE_NO_PREPARE(AlterTableStore, cluster, settings); + CHECK_PREPARED_DDL(AlterTableStore); + + try { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError("Invalid cluster: " + cluster)); + } + + std::pair pathPair; + { + TString error; + if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError(error)); + } + } + + NKikimrSchemeOp::TModifyScheme schemeTx; + schemeTx.SetWorkingDir(pathPair.first); + + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnStore); + NKikimrSchemeOp::TAlterColumnStore* alter = schemeTx.MutableAlterColumnStore(); + alter->SetName(pathPair.second); + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableAlterTableStore()->Swap(&schemeTx); + + TGenericResult result; + result.SetSuccess(); + return MakeFuture(result); + } else { + return Gateway->ModifyScheme(std::move(schemeTx)); + } + } + catch (yexception& e) { + return MakeFuture(ResultFromException(e)); + } } TFuture DropTableStore(const TString& cluster, - const TDropTableStoreSettings& settings) override + const TDropTableStoreSettings& settings, bool missingOk) override { - FORWARD_ENSURE_NO_PREPARE(DropTableStore, cluster, settings); + CHECK_PREPARED_DDL(DropTableStore); + + try { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError("Invalid cluster: " + cluster)); + } + + std::pair pathPair; + { + TString error; + if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError(error)); + } + } + + NKikimrSchemeOp::TModifyScheme schemeTx; + schemeTx.SetWorkingDir(pathPair.first); + schemeTx.SetSuccessOnNotExist(missingOk); + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropColumnStore); + NKikimrSchemeOp::TDrop* drop = schemeTx.MutableDrop(); + drop->SetName(pathPair.second); + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableDropTableStore()->Swap(&schemeTx); + + TGenericResult result; + result.SetSuccess(); + return MakeFuture(result); + } else { + return Gateway->ModifyScheme(std::move(schemeTx)); + } + } + catch (yexception& e) { + return MakeFuture(ResultFromException(e)); + } } TFuture CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 6940dc181627..cfb96efa16ab 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -948,12 +948,13 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformerCreateTableStore(cluster, - ParseCreateTableStoreSettings(maybeCreate.Cast(), table.Metadata->TableSettings)); + ParseCreateTableStoreSettings(maybeCreate.Cast(), table.Metadata->TableSettings), existingOk); break; } case ETableType::Table: case ETableType::Unknown: { - future = isColumn ? Gateway->CreateColumnTable(table.Metadata, true) : Gateway->CreateTable(table.Metadata, true, existingOk); + future = isColumn ? Gateway->CreateColumnTable(table.Metadata, true, existingOk) + : Gateway->CreateTable(table.Metadata, true, existingOk); break; } } @@ -1017,7 +1018,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformerDropTableStore(cluster, ParseDropTableStoreSettings(maybeDrop.Cast())); + future = Gateway->DropTableStore(cluster, ParseDropTableStoreSettings(maybeDrop.Cast()), missingOk); break; case ETableType::ExternalTable: future = Gateway->DropExternalTable(cluster, ParseDropExternalTableSettings(maybeDrop.Cast()), missingOk); diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 8d79563400f0..ddd2d76e80d1 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -833,15 +833,18 @@ class IKikimrGateway : public TThrRefBase { virtual NThreading::TFuture DropGroup(const TString& cluster, const TDropGroupSettings& settings) = 0; - virtual NThreading::TFuture CreateColumnTable(TKikimrTableMetadataPtr metadata, bool createDir) = 0; + virtual NThreading::TFuture CreateColumnTable( + TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0; virtual NThreading::TFuture AlterColumnTable(const TString& cluster, const TAlterColumnTableSettings& settings) = 0; - virtual NThreading::TFuture CreateTableStore(const TString& cluster, const TCreateTableStoreSettings& settings) = 0; + virtual NThreading::TFuture CreateTableStore(const TString& cluster, + const TCreateTableStoreSettings& settings, bool existingOk = false) = 0; virtual NThreading::TFuture AlterTableStore(const TString& cluster, const TAlterTableStoreSettings& settings) = 0; - virtual NThreading::TFuture DropTableStore(const TString& cluster, const TDropTableStoreSettings& settings) = 0; + virtual NThreading::TFuture DropTableStore(const TString& cluster, + const TDropTableStoreSettings& settings, bool missingOk) = 0; virtual NThreading::TFuture CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, bool createDir, bool existingOk, bool replaceIfExists) = 0; diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 460fa094bef0..32181f5da43c 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -1,5 +1,7 @@ #include #include +#include +#include #include #include #include @@ -671,6 +673,146 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { checkDrop(true, EEx::IfExists, 1); } + Y_UNIT_TEST(DdlColumnTable) { + const TVector schema = { + TTestHelper::TColumnSchema().SetName("Key").SetType(NScheme::NTypeIds::Uint64).SetNullable(false), + TTestHelper::TColumnSchema().SetName("Value").SetType(NScheme::NTypeIds::String) + }; + + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + + TTestHelper testHelper(serverSettings); + auto& kikimr = testHelper.GetKikimr(); + + auto db = kikimr.GetQueryClient(); + + enum EEx { + Empty, + IfExists, + IfNotExists, + }; + + auto checkCreate = [&](bool expectSuccess, EEx exMode, const TString& objPath, bool isStore) { + UNIT_ASSERT_UNEQUAL(exMode, EEx::IfExists); + const TString ifNotExistsStatement = exMode == EEx::IfNotExists ? "IF NOT EXISTS" : ""; + const TString objType = isStore ? "TABLESTORE" : "TABLE"; + const TString hash = !isStore ? " PARTITION BY HASH(Key) " : ""; + auto sql = TStringBuilder() << R"( + --!syntax_v1 + CREATE )" << objType << " " << ifNotExistsStatement << " `" << objPath << R"(` ( + Key Uint64 NOT NULL, + Value String, + PRIMARY KEY (Key) + ))" << hash << R"( + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10 + );)"; + + auto result = db.ExecuteQuery(sql, TTxControl::NoTx()).ExtractValueSync(); + if (expectSuccess) { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } else { + UNIT_ASSERT_VALUES_UNEQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + UNIT_ASSERT(result.GetResultSets().empty()); + }; + + auto checkAlter = [&](const TString& objPath, bool isStore) { + const TString objType = isStore ? "TABLESTORE" : "TABLE"; + { + auto sql = TStringBuilder() << R"( + --!syntax_v1 + ALTER )" << objType << " `" << objPath << R"(` + ADD COLUMN NewColumn Uint64; + ;)"; + + auto result = db.ExecuteQuery(sql, TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto sql = TStringBuilder() << R"( + --!syntax_v1 + ALTER )" << objType << " `" << objPath << R"(` + DROP COLUMN NewColumn; + ;)"; + + auto result = db.ExecuteQuery(sql, TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + }; + + auto checkDrop = [&](bool expectSuccess, EEx exMode, + const TString& objPath, bool isStore) { + UNIT_ASSERT_UNEQUAL(exMode, EEx::IfNotExists); + const TString ifExistsStatement = exMode == EEx::IfExists ? "IF EXISTS" : ""; + const TString objType = isStore ? "TABLESTORE" : "TABLE"; + auto sql = TStringBuilder() << R"( + --!syntax_v1 + DROP )" << objType << " " << ifExistsStatement << " `" << objPath << R"(`;)"; + + auto result = db.ExecuteQuery(sql, TTxControl::NoTx()).ExtractValueSync(); + if (expectSuccess) { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } else { + UNIT_ASSERT_VALUES_UNEQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + UNIT_ASSERT(result.GetResultSets().empty()); + }; + + auto checkAddRow = [&](const TString& objPath) { + const size_t inserted_rows = 5; + TTestHelper::TColumnTable testTable; + testTable.SetName(objPath) + .SetPrimaryKey({"Key"}) + .SetSharding({"Key"}) + .SetSchema(schema); + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + for (size_t i = 0; i < inserted_rows; i++) { + tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)); + } + testHelper.BulkUpsert(testTable, tableInserter); + } + + Sleep(TDuration::Seconds(100)); + + auto sql = TStringBuilder() << R"( + SELECT Value FROM `)" << objPath << R"(` WHERE Key=1)"; + + testHelper.ReadData(sql, "[[[\"test_res_1\"]]]"); + }; + + checkCreate(true, EEx::Empty, "/Root/TableStoreTest", true); + checkCreate(false, EEx::Empty, "/Root/TableStoreTest", true); + checkCreate(true, EEx::IfNotExists, "/Root/TableStoreTest", true); + checkDrop(true, EEx::Empty, "/Root/TableStoreTest", true); + checkDrop(true, EEx::IfExists, "/Root/TableStoreTest", true); + checkDrop(false, EEx::Empty, "/Root/TableStoreTest", true); + + checkCreate(true, EEx::IfNotExists, "/Root/TableStoreTest", true); + checkCreate(false, EEx::Empty, "/Root/TableStoreTest", true); + checkDrop(true, EEx::IfExists, "/Root/TableStoreTest", true); + checkDrop(false, EEx::Empty, "/Root/TableStoreTest", true); + + checkCreate(true, EEx::IfNotExists, "/Root/ColumnTable", false); + checkAlter("/Root/ColumnTable", false); + checkDrop(true, EEx::IfExists, "/Root/ColumnTable", false); + + checkCreate(true, EEx::Empty, "/Root/ColumnTable", false); + checkCreate(false, EEx::Empty, "/Root/ColumnTable", false); + checkCreate(true, EEx::IfNotExists, "/Root/ColumnTable", false); + checkAddRow("/Root/ColumnTable"); + checkDrop(true, EEx::IfExists, "/Root/ColumnTable", false); + checkDrop(false, EEx::Empty, "/Root/ColumnTable", false); + checkDrop(true, EEx::IfExists, "/Root/ColumnTable", false); + } + Y_UNIT_TEST(DdlUser) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 53da436f29e6..7be29de58da6 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -430,6 +430,11 @@ message TKqpSchemeOperation { TKqpPhyMetadataOperation UpsertObject = 24; TKqpPhyMetadataOperation AlterObject = 25; TKqpPhyMetadataOperation DropObject = 26; + NKikimrSchemeOp.TModifyScheme CreateColumnTable = 27; + NKikimrSchemeOp.TModifyScheme AlterColumnTable = 28; + NKikimrSchemeOp.TModifyScheme CreateTableStore = 29; + NKikimrSchemeOp.TModifyScheme DropTableStore = 30; + NKikimrSchemeOp.TModifyScheme AlterTableStore = 31; } }