Skip to content

Commit 6c02062

Browse files
authored
Merge 2da7452 into 8472b26
2 parents 8472b26 + 2da7452 commit 6c02062

File tree

7 files changed

+481
-267
lines changed

7 files changed

+481
-267
lines changed

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,36 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
197197
break;
198198
}
199199

200+
case NKqpProto::TKqpSchemeOperation::kCreateColumnTable: {
201+
const auto& modifyScheme = schemeOp.GetCreateColumnTable();
202+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
203+
break;
204+
}
205+
206+
case NKqpProto::TKqpSchemeOperation::kAlterColumnTable: {
207+
const auto& modifyScheme = schemeOp.GetAlterColumnTable();
208+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
209+
break;
210+
}
211+
212+
case NKqpProto::TKqpSchemeOperation::kCreateTableStore: {
213+
const auto& modifyScheme = schemeOp.GetCreateTableStore();
214+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
215+
break;
216+
}
217+
218+
case NKqpProto::TKqpSchemeOperation::kAlterTableStore: {
219+
const auto& modifyScheme = schemeOp.GetAlterTableStore();
220+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
221+
break;
222+
}
223+
224+
case NKqpProto::TKqpSchemeOperation::kDropTableStore: {
225+
const auto& modifyScheme = schemeOp.GetDropTableStore();
226+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
227+
break;
228+
}
229+
200230
default:
201231
InternalError(TStringBuilder() << "Unexpected scheme operation: "
202232
<< (ui32) schemeOp.GetOperationCase());

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

+23-253
Original file line numberDiff line numberDiff line change
@@ -876,51 +876,12 @@ class TKikimrIcGateway : public IKqpGateway {
876876
return tablePromise.GetFuture();
877877
}
878878

879-
TFuture<TGenericResult> CreateColumnTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir) override {
880-
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
881-
882-
try {
883-
if (!CheckCluster(metadata->Cluster)) {
884-
return InvalidCluster<TGenericResult>(metadata->Cluster);
885-
}
886-
887-
std::pair<TString, TString> pathPair;
888-
{
889-
TString error;
890-
if (!GetPathPair(metadata->Name, pathPair, error, createDir)) {
891-
return MakeFuture(ResultFromError<TGenericResult>(error));
892-
}
893-
}
894-
895-
auto ev = MakeHolder<TRequest>();
896-
ev->Record.SetDatabaseName(Database);
897-
if (UserToken) {
898-
ev->Record.SetUserToken(UserToken->GetSerializedToken());
899-
}
900-
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
901-
schemeTx.SetWorkingDir(pathPair.first);
902-
903-
Ydb::StatusIds::StatusCode code;
904-
TString error;
905-
906-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnTable);
907-
NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable();
908-
909-
tableDesc->SetName(pathPair.second);
910-
FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata);
911-
912-
if (!FillCreateColumnTableDesc(metadata, *tableDesc, code, error)) {
913-
IKqpGateway::TGenericResult errResult;
914-
errResult.AddIssue(NYql::TIssue(error));
915-
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
916-
return MakeFuture(std::move(errResult));
917-
}
918-
919-
return SendSchemeRequest(ev.Release());
920-
}
921-
catch (yexception& e) {
922-
return MakeFuture(ResultFromException<TGenericResult>(e));
923-
}
879+
TFuture<TGenericResult> CreateColumnTable(NYql::TKikimrTableMetadataPtr metadata,
880+
bool createDir, bool existingOk) override {
881+
Y_UNUSED(metadata);
882+
Y_UNUSED(createDir);
883+
Y_UNUSED(existingOk);
884+
return NotImplemented<TGenericResult>();
924885
}
925886

926887
TFuture<TGenericResult> AlterTable(const TString&, Ydb::Table::AlterTableRequest&&, const TMaybe<TString>&, ui64, NKikimrIndexBuilder::TIndexBuildSettings&&) override
@@ -1034,150 +995,33 @@ class TKikimrIcGateway : public IKqpGateway {
1034995

1035996
TFuture<TGenericResult> AlterColumnTable(const TString& cluster,
1036997
const NYql::TAlterColumnTableSettings& settings) override {
1037-
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
1038-
1039-
try {
1040-
if (!CheckCluster(cluster)) {
1041-
return InvalidCluster<TGenericResult>(cluster);
1042-
}
1043-
1044-
std::pair<TString, TString> pathPair;
1045-
{
1046-
TString error;
1047-
if (!GetPathPair(settings.Table, pathPair, error, false)) {
1048-
return MakeFuture(ResultFromError<TGenericResult>(error));
1049-
}
1050-
}
1051-
1052-
auto ev = MakeHolder<TRequest>();
1053-
ev->Record.SetDatabaseName(Database);
1054-
if (UserToken) {
1055-
ev->Record.SetUserToken(UserToken->GetSerializedToken());
1056-
}
1057-
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
1058-
schemeTx.SetWorkingDir(pathPair.first);
1059-
1060-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable);
1061-
NKikimrSchemeOp::TAlterColumnTable* alter = schemeTx.MutableAlterColumnTable();
1062-
alter->SetName(settings.Table);
1063-
1064-
return SendSchemeRequest(ev.Release());
1065-
}
1066-
catch (yexception& e) {
1067-
return MakeFuture(ResultFromException<TGenericResult>(e));
1068-
}
998+
Y_UNUSED(cluster);
999+
Y_UNUSED(settings);
1000+
return NotImplemented<TGenericResult>();
10691001
}
10701002

10711003
TFuture<TGenericResult> CreateTableStore(const TString& cluster,
1072-
const NYql::TCreateTableStoreSettings& settings) override {
1073-
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
1074-
1075-
try {
1076-
if (!CheckCluster(cluster)) {
1077-
return InvalidCluster<TGenericResult>(cluster);
1078-
}
1079-
1080-
std::pair<TString, TString> pathPair;
1081-
{
1082-
TString error;
1083-
if (!GetPathPair(settings.TableStore, pathPair, error, false)) {
1084-
return MakeFuture(ResultFromError<TGenericResult>(error));
1085-
}
1086-
}
1087-
1088-
auto ev = MakeHolder<TRequest>();
1089-
ev->Record.SetDatabaseName(Database);
1090-
if (UserToken) {
1091-
ev->Record.SetUserToken(UserToken->GetSerializedToken());
1092-
}
1093-
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
1094-
schemeTx.SetWorkingDir(pathPair.first);
1095-
1096-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnStore);
1097-
NKikimrSchemeOp::TColumnStoreDescription* storeDesc = schemeTx.MutableCreateColumnStore();
1098-
storeDesc->SetName(pathPair.second);
1099-
storeDesc->SetColumnShardCount(settings.ShardsCount);
1100-
1101-
NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets();
1102-
schemaPreset->SetName("default");
1103-
FillColumnTableSchema(*schemaPreset->MutableSchema(), settings);
1104-
1105-
return SendSchemeRequest(ev.Release());
1106-
}
1107-
catch (yexception& e) {
1108-
return MakeFuture(ResultFromException<TGenericResult>(e));
1109-
}
1004+
const NYql::TCreateTableStoreSettings& settings,
1005+
bool existingOk) override {
1006+
Y_UNUSED(cluster);
1007+
Y_UNUSED(settings);
1008+
Y_UNUSED(existingOk);
1009+
return NotImplemented<TGenericResult>();
11101010
}
11111011

11121012
TFuture<TGenericResult> AlterTableStore(const TString& cluster,
11131013
const NYql::TAlterTableStoreSettings& settings) override {
1114-
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
1115-
1116-
try {
1117-
if (!CheckCluster(cluster)) {
1118-
return InvalidCluster<TGenericResult>(cluster);
1119-
}
1120-
1121-
std::pair<TString, TString> pathPair;
1122-
{
1123-
TString error;
1124-
if (!GetPathPair(settings.TableStore, pathPair, error, false)) {
1125-
return MakeFuture(ResultFromError<TGenericResult>(error));
1126-
}
1127-
}
1128-
1129-
auto ev = MakeHolder<TRequest>();
1130-
ev->Record.SetDatabaseName(Database);
1131-
if (UserToken) {
1132-
ev->Record.SetUserToken(UserToken->GetSerializedToken());
1133-
}
1134-
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
1135-
schemeTx.SetWorkingDir(pathPair.first);
1136-
1137-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnStore);
1138-
NKikimrSchemeOp::TAlterColumnStore* alter = schemeTx.MutableAlterColumnStore();
1139-
alter->SetName(pathPair.second);
1140-
1141-
return SendSchemeRequest(ev.Release());
1142-
}
1143-
catch (yexception& e) {
1144-
return MakeFuture(ResultFromException<TGenericResult>(e));
1145-
}
1014+
Y_UNUSED(cluster);
1015+
Y_UNUSED(settings);
1016+
return NotImplemented<TGenericResult>();
11461017
}
11471018

11481019
TFuture<TGenericResult> DropTableStore(const TString& cluster,
1149-
const NYql::TDropTableStoreSettings& settings) override {
1150-
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
1151-
1152-
try {
1153-
if (!CheckCluster(cluster)) {
1154-
return InvalidCluster<TGenericResult>(cluster);
1155-
}
1156-
1157-
std::pair<TString, TString> pathPair;
1158-
{
1159-
TString error;
1160-
if (!GetPathPair(settings.TableStore, pathPair, error, false)) {
1161-
return MakeFuture(ResultFromError<TGenericResult>(error));
1162-
}
1163-
}
1164-
1165-
auto ev = MakeHolder<TRequest>();
1166-
ev->Record.SetDatabaseName(Database);
1167-
if (UserToken) {
1168-
ev->Record.SetUserToken(UserToken->GetSerializedToken());
1169-
}
1170-
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
1171-
schemeTx.SetWorkingDir(pathPair.first);
1172-
1173-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropColumnStore);
1174-
NKikimrSchemeOp::TDrop* drop = schemeTx.MutableDrop();
1175-
drop->SetName(pathPair.second);
1176-
return SendSchemeRequest(ev.Release());
1177-
}
1178-
catch (yexception& e) {
1179-
return MakeFuture(ResultFromException<TGenericResult>(e));
1180-
}
1020+
const NYql::TDropTableStoreSettings& settings, bool missingOk) override {
1021+
Y_UNUSED(cluster);
1022+
Y_UNUSED(settings);
1023+
Y_UNUSED(missingOk);
1024+
return NotImplemented<TGenericResult>();
11811025
}
11821026

11831027
TFuture<TGenericResult> CreateExternalTable(const TString& cluster,
@@ -2327,27 +2171,6 @@ class TKikimrIcGateway : public IKqpGateway {
23272171
return result;
23282172
}
23292173

2330-
template <typename T>
2331-
static void FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& metadata)
2332-
{
2333-
Y_ENSURE(metadata.ColumnOrder.size() == metadata.Columns.size());
2334-
for (const auto& name : metadata.ColumnOrder) {
2335-
auto columnIt = metadata.Columns.find(name);
2336-
Y_ENSURE(columnIt != metadata.Columns.end());
2337-
2338-
TOlapColumnDescription& columnDesc = *schema.AddColumns();
2339-
columnDesc.SetName(columnIt->second.Name);
2340-
columnDesc.SetType(columnIt->second.Type);
2341-
columnDesc.SetNotNull(columnIt->second.NotNull);
2342-
}
2343-
2344-
for (const auto& keyColumn : metadata.KeyColumnNames) {
2345-
schema.AddKeyColumnNames(keyColumn);
2346-
}
2347-
2348-
schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES);
2349-
}
2350-
23512174
static void FillParameters(TQueryData::TPtr params, ::google::protobuf::Map<TBasicString<char>, Ydb::TypedValue>* output) {
23522175
if (!params) {
23532176
return;
@@ -2357,59 +2180,6 @@ class TKikimrIcGateway : public IKqpGateway {
23572180
output->insert(paramsMap.begin(), paramsMap.end());
23582181
}
23592182

2360-
static bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata,
2361-
NKikimrSchemeOp::TColumnTableDescription& tableDesc, Ydb::StatusIds::StatusCode& code, TString& error)
2362-
{
2363-
if (metadata->Columns.empty()) {
2364-
tableDesc.SetSchemaPresetName("default");
2365-
}
2366-
2367-
auto& hashSharding = *tableDesc.MutableSharding()->MutableHashSharding();
2368-
2369-
for (const TString& column : metadata->TableSettings.PartitionBy) {
2370-
if (!metadata->Columns.count(column)) {
2371-
code = Ydb::StatusIds::BAD_REQUEST;
2372-
error = TStringBuilder() << "Unknown column '" << column << "' in partition by key";
2373-
return false;
2374-
}
2375-
2376-
hashSharding.AddColumns(column);
2377-
}
2378-
2379-
if (metadata->TableSettings.PartitionByHashFunction) {
2380-
if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "cloud_logs") {
2381-
hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CLOUD_LOGS);
2382-
} else if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "consistency_hash_64") {
2383-
hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CONSISTENCY_64);
2384-
} else if (to_lower(metadata->TableSettings.PartitionByHashFunction.GetRef()) == "modulo_n") {
2385-
hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_MODULO_N);
2386-
} else {
2387-
code = Ydb::StatusIds::BAD_REQUEST;
2388-
error = TStringBuilder() << "Unknown hash function '"
2389-
<< metadata->TableSettings.PartitionByHashFunction.GetRef() << "' to partition by";
2390-
return false;
2391-
}
2392-
} else {
2393-
hashSharding.SetFunction(NKikimrSchemeOp::TColumnTableSharding::THashSharding::HASH_FUNCTION_CONSISTENCY_64);
2394-
}
2395-
2396-
if (metadata->TableSettings.MinPartitions) {
2397-
tableDesc.SetColumnShardCount(*metadata->TableSettings.MinPartitions);
2398-
}
2399-
2400-
if (metadata->TableSettings.TtlSettings.Defined() && metadata->TableSettings.TtlSettings.IsSet()) {
2401-
const auto& inputSettings = metadata->TableSettings.TtlSettings.GetValueSet();
2402-
auto& resultSettings = *tableDesc.MutableTtlSettings();
2403-
resultSettings.MutableEnabled()->SetColumnName(inputSettings.ColumnName);
2404-
resultSettings.MutableEnabled()->SetExpireAfterSeconds(inputSettings.ExpireAfter.Seconds());
2405-
if (inputSettings.ColumnUnit) {
2406-
resultSettings.MutableEnabled()->SetColumnUnit(static_cast<NKikimrSchemeOp::TTTLSettings::EUnit>(*inputSettings.ColumnUnit));
2407-
}
2408-
}
2409-
2410-
return true;
2411-
}
2412-
24132183
private:
24142184
TString Cluster;
24152185
const NKikimrKqp::EQueryType QueryType;

0 commit comments

Comments
 (0)