Skip to content

Commit 8b1abbe

Browse files
authored
Merge 02f6af3 into ef309ff
2 parents ef309ff + 02f6af3 commit 8b1abbe

File tree

7 files changed

+506
-267
lines changed

7 files changed

+506
-267
lines changed

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,36 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
203203
break;
204204
}
205205

206+
case NKqpProto::TKqpSchemeOperation::kCreateColumnTable: {
207+
const auto& modifyScheme = schemeOp.GetCreateColumnTable();
208+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
209+
break;
210+
}
211+
212+
case NKqpProto::TKqpSchemeOperation::kAlterColumnTable: {
213+
const auto& modifyScheme = schemeOp.GetAlterColumnTable();
214+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
215+
break;
216+
}
217+
218+
case NKqpProto::TKqpSchemeOperation::kCreateTableStore: {
219+
const auto& modifyScheme = schemeOp.GetCreateTableStore();
220+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
221+
break;
222+
}
223+
224+
case NKqpProto::TKqpSchemeOperation::kAlterTableStore: {
225+
const auto& modifyScheme = schemeOp.GetAlterTableStore();
226+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
227+
break;
228+
}
229+
230+
case NKqpProto::TKqpSchemeOperation::kDropTableStore: {
231+
const auto& modifyScheme = schemeOp.GetDropTableStore();
232+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
233+
break;
234+
}
235+
206236
default:
207237
InternalError(TStringBuilder() << "Unexpected scheme operation: "
208238
<< (ui32) schemeOp.GetOperationCase());

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

+23-253
Original file line numberDiff line numberDiff line change
@@ -877,51 +877,12 @@ class TKikimrIcGateway : public IKqpGateway {
877877
return tablePromise.GetFuture();
878878
}
879879

880-
TFuture<TGenericResult> CreateColumnTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir) override {
881-
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
882-
883-
try {
884-
if (!CheckCluster(metadata->Cluster)) {
885-
return InvalidCluster<TGenericResult>(metadata->Cluster);
886-
}
887-
888-
std::pair<TString, TString> pathPair;
889-
{
890-
TString error;
891-
if (!GetPathPair(metadata->Name, pathPair, error, createDir)) {
892-
return MakeFuture(ResultFromError<TGenericResult>(error));
893-
}
894-
}
895-
896-
auto ev = MakeHolder<TRequest>();
897-
ev->Record.SetDatabaseName(Database);
898-
if (UserToken) {
899-
ev->Record.SetUserToken(UserToken->GetSerializedToken());
900-
}
901-
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
902-
schemeTx.SetWorkingDir(pathPair.first);
903-
904-
Ydb::StatusIds::StatusCode code;
905-
TString error;
906-
907-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnTable);
908-
NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable();
909-
910-
tableDesc->SetName(pathPair.second);
911-
FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata);
912-
913-
if (!FillCreateColumnTableDesc(metadata, *tableDesc, code, error)) {
914-
IKqpGateway::TGenericResult errResult;
915-
errResult.AddIssue(NYql::TIssue(error));
916-
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
917-
return MakeFuture(std::move(errResult));
918-
}
919-
920-
return SendSchemeRequest(ev.Release());
921-
}
922-
catch (yexception& e) {
923-
return MakeFuture(ResultFromException<TGenericResult>(e));
924-
}
880+
TFuture<TGenericResult> CreateColumnTable(NYql::TKikimrTableMetadataPtr metadata,
881+
bool createDir, bool existingOk) override {
882+
Y_UNUSED(metadata);
883+
Y_UNUSED(createDir);
884+
Y_UNUSED(existingOk);
885+
return NotImplemented<TGenericResult>();
925886
}
926887

927888
TFuture<TGenericResult> AlterTable(const TString&, Ydb::Table::AlterTableRequest&&, const TMaybe<TString>&, ui64, NKikimrIndexBuilder::TIndexBuildSettings&&) override
@@ -1035,150 +996,33 @@ class TKikimrIcGateway : public IKqpGateway {
1035996

1036997
TFuture<TGenericResult> AlterColumnTable(const TString& cluster,
1037998
const NYql::TAlterColumnTableSettings& settings) override {
1038-
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
1039-
1040-
try {
1041-
if (!CheckCluster(cluster)) {
1042-
return InvalidCluster<TGenericResult>(cluster);
1043-
}
1044-
1045-
std::pair<TString, TString> pathPair;
1046-
{
1047-
TString error;
1048-
if (!GetPathPair(settings.Table, pathPair, error, false)) {
1049-
return MakeFuture(ResultFromError<TGenericResult>(error));
1050-
}
1051-
}
1052-
1053-
auto ev = MakeHolder<TRequest>();
1054-
ev->Record.SetDatabaseName(Database);
1055-
if (UserToken) {
1056-
ev->Record.SetUserToken(UserToken->GetSerializedToken());
1057-
}
1058-
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
1059-
schemeTx.SetWorkingDir(pathPair.first);
1060-
1061-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable);
1062-
NKikimrSchemeOp::TAlterColumnTable* alter = schemeTx.MutableAlterColumnTable();
1063-
alter->SetName(settings.Table);
1064-
1065-
return SendSchemeRequest(ev.Release());
1066-
}
1067-
catch (yexception& e) {
1068-
return MakeFuture(ResultFromException<TGenericResult>(e));
1069-
}
999+
Y_UNUSED(cluster);
1000+
Y_UNUSED(settings);
1001+
return NotImplemented<TGenericResult>();
10701002
}
10711003

10721004
TFuture<TGenericResult> CreateTableStore(const TString& cluster,
1073-
const NYql::TCreateTableStoreSettings& settings) override {
1074-
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
1075-
1076-
try {
1077-
if (!CheckCluster(cluster)) {
1078-
return InvalidCluster<TGenericResult>(cluster);
1079-
}
1080-
1081-
std::pair<TString, TString> pathPair;
1082-
{
1083-
TString error;
1084-
if (!GetPathPair(settings.TableStore, pathPair, error, false)) {
1085-
return MakeFuture(ResultFromError<TGenericResult>(error));
1086-
}
1087-
}
1088-
1089-
auto ev = MakeHolder<TRequest>();
1090-
ev->Record.SetDatabaseName(Database);
1091-
if (UserToken) {
1092-
ev->Record.SetUserToken(UserToken->GetSerializedToken());
1093-
}
1094-
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
1095-
schemeTx.SetWorkingDir(pathPair.first);
1096-
1097-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnStore);
1098-
NKikimrSchemeOp::TColumnStoreDescription* storeDesc = schemeTx.MutableCreateColumnStore();
1099-
storeDesc->SetName(pathPair.second);
1100-
storeDesc->SetColumnShardCount(settings.ShardsCount);
1101-
1102-
NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets();
1103-
schemaPreset->SetName("default");
1104-
FillColumnTableSchema(*schemaPreset->MutableSchema(), settings);
1105-
1106-
return SendSchemeRequest(ev.Release());
1107-
}
1108-
catch (yexception& e) {
1109-
return MakeFuture(ResultFromException<TGenericResult>(e));
1110-
}
1005+
const NYql::TCreateTableStoreSettings& settings,
1006+
bool existingOk) override {
1007+
Y_UNUSED(cluster);
1008+
Y_UNUSED(settings);
1009+
Y_UNUSED(existingOk);
1010+
return NotImplemented<TGenericResult>();
11111011
}
11121012

11131013
TFuture<TGenericResult> AlterTableStore(const TString& cluster,
11141014
const NYql::TAlterTableStoreSettings& settings) override {
1115-
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
1116-
1117-
try {
1118-
if (!CheckCluster(cluster)) {
1119-
return InvalidCluster<TGenericResult>(cluster);
1120-
}
1121-
1122-
std::pair<TString, TString> pathPair;
1123-
{
1124-
TString error;
1125-
if (!GetPathPair(settings.TableStore, pathPair, error, false)) {
1126-
return MakeFuture(ResultFromError<TGenericResult>(error));
1127-
}
1128-
}
1129-
1130-
auto ev = MakeHolder<TRequest>();
1131-
ev->Record.SetDatabaseName(Database);
1132-
if (UserToken) {
1133-
ev->Record.SetUserToken(UserToken->GetSerializedToken());
1134-
}
1135-
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
1136-
schemeTx.SetWorkingDir(pathPair.first);
1137-
1138-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnStore);
1139-
NKikimrSchemeOp::TAlterColumnStore* alter = schemeTx.MutableAlterColumnStore();
1140-
alter->SetName(pathPair.second);
1141-
1142-
return SendSchemeRequest(ev.Release());
1143-
}
1144-
catch (yexception& e) {
1145-
return MakeFuture(ResultFromException<TGenericResult>(e));
1146-
}
1015+
Y_UNUSED(cluster);
1016+
Y_UNUSED(settings);
1017+
return NotImplemented<TGenericResult>();
11471018
}
11481019

11491020
TFuture<TGenericResult> DropTableStore(const TString& cluster,
1150-
const NYql::TDropTableStoreSettings& settings) override {
1151-
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
1152-
1153-
try {
1154-
if (!CheckCluster(cluster)) {
1155-
return InvalidCluster<TGenericResult>(cluster);
1156-
}
1157-
1158-
std::pair<TString, TString> pathPair;
1159-
{
1160-
TString error;
1161-
if (!GetPathPair(settings.TableStore, pathPair, error, false)) {
1162-
return MakeFuture(ResultFromError<TGenericResult>(error));
1163-
}
1164-
}
1165-
1166-
auto ev = MakeHolder<TRequest>();
1167-
ev->Record.SetDatabaseName(Database);
1168-
if (UserToken) {
1169-
ev->Record.SetUserToken(UserToken->GetSerializedToken());
1170-
}
1171-
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
1172-
schemeTx.SetWorkingDir(pathPair.first);
1173-
1174-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropColumnStore);
1175-
NKikimrSchemeOp::TDrop* drop = schemeTx.MutableDrop();
1176-
drop->SetName(pathPair.second);
1177-
return SendSchemeRequest(ev.Release());
1178-
}
1179-
catch (yexception& e) {
1180-
return MakeFuture(ResultFromException<TGenericResult>(e));
1181-
}
1021+
const NYql::TDropTableStoreSettings& settings, bool missingOk) override {
1022+
Y_UNUSED(cluster);
1023+
Y_UNUSED(settings);
1024+
Y_UNUSED(missingOk);
1025+
return NotImplemented<TGenericResult>();
11821026
}
11831027

11841028
TFuture<TGenericResult> CreateExternalTable(const TString& cluster,
@@ -2328,27 +2172,6 @@ class TKikimrIcGateway : public IKqpGateway {
23282172
return result;
23292173
}
23302174

2331-
template <typename T>
2332-
static void FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& metadata)
2333-
{
2334-
Y_ENSURE(metadata.ColumnOrder.size() == metadata.Columns.size());
2335-
for (const auto& name : metadata.ColumnOrder) {
2336-
auto columnIt = metadata.Columns.find(name);
2337-
Y_ENSURE(columnIt != metadata.Columns.end());
2338-
2339-
TOlapColumnDescription& columnDesc = *schema.AddColumns();
2340-
columnDesc.SetName(columnIt->second.Name);
2341-
columnDesc.SetType(columnIt->second.Type);
2342-
columnDesc.SetNotNull(columnIt->second.NotNull);
2343-
}
2344-
2345-
for (const auto& keyColumn : metadata.KeyColumnNames) {
2346-
schema.AddKeyColumnNames(keyColumn);
2347-
}
2348-
2349-
schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES);
2350-
}
2351-
23522175
static void FillParameters(TQueryData::TPtr params, ::google::protobuf::Map<TBasicString<char>, Ydb::TypedValue>* output) {
23532176
if (!params) {
23542177
return;
@@ -2358,59 +2181,6 @@ class TKikimrIcGateway : public IKqpGateway {
23582181
output->insert(paramsMap.begin(), paramsMap.end());
23592182
}
23602183

2361-
static bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata,
2362-
NKikimrSchemeOp::TColumnTableDescription& tableDesc, Ydb::StatusIds::StatusCode& code, TString& error)
2363-
{
2364-
if (metadata->Columns.empty()) {
2365-
tableDesc.SetSchemaPresetName("default");
2366-
}
2367-
2368-
auto& hashSharding = *tableDesc.MutableSharding()->MutableHashSharding();
2369-
2370-
for (const TString& column : metadata->TableSettings.PartitionBy) {
2371-
if (!metadata->Columns.count(column)) {
2372-
code = Ydb::StatusIds::BAD_REQUEST;
2373-
error = TStringBuilder() << "Unknown column '" << column << "' in partition by key";
2374-
return false;
2375-
}
2376-
2377-
hashSharding.AddColumns(column);
2378-
}
2379-
2380-
if (metadata->TableSettings.PartitionByHashFunction) {
2381-
if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "cloud_logs") {
2382-
hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS);
2383-
} else if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "consistency_hash_64") {
2384-
hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CONSISTENCY_64);
2385-
} else if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "modulo_n") {
2386-
hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N);
2387-
} else {
2388-
code = Ydb::StatusIds::BAD_REQUEST;
2389-
error = TStringBuilder() << "Unknown hash function '"
2390-
<< metadata->TableSettings.PartitionByHashFunction.GetRef() << "' to partition by";
2391-
return false;
2392-
}
2393-
} else {
2394-
hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CONSISTENCY_64);
2395-
}
2396-
2397-
if (metadata->TableSettings.MinPartitions) {
2398-
tableDesc.SetColumnShardCount(*metadata->TableSettings.MinPartitions);
2399-
}
2400-
2401-
if (metadata->TableSettings.TtlSettings.Defined() && metadata->TableSettings.TtlSettings.IsSet()) {
2402-
const auto& inputSettings = metadata->TableSettings.TtlSettings.GetValueSet();
2403-
auto& resultSettings = *tableDesc.MutableTtlSettings();
2404-
resultSettings.MutableEnabled()->SetColumnName(inputSettings.ColumnName);
2405-
resultSettings.MutableEnabled()->SetExpireAfterSeconds(inputSettings.ExpireAfter.Seconds());
2406-
if (inputSettings.ColumnUnit) {
2407-
resultSettings.MutableEnabled()->SetColumnUnit(static_cast<NKikimrSchemeOp::TTTLSettings::EUnit>(*inputSettings.ColumnUnit));
2408-
}
2409-
}
2410-
2411-
return true;
2412-
}
2413-
24142184
private:
24152185
TString Cluster;
24162186
const NKikimrKqp::EQueryType QueryType;

0 commit comments

Comments
 (0)