Skip to content

Commit 76efb8a

Browse files
committed
Fixes
1 parent a3e630a commit 76efb8a

File tree

1 file changed

+162
-137
lines changed

1 file changed

+162
-137
lines changed

ydb/core/kqp/host/kqp_gateway_proxy.cpp

+162-137
Original file line numberDiff line numberDiff line change
@@ -1293,52 +1293,57 @@ class TKqpGatewayProxy : public IKikimrGateway {
12931293
bool createDir, bool existingOk) override {
12941294
CHECK_PREPARED_DDL(CreateColumnTable);
12951295

1296-
const auto& cluster = metadata->Cluster;
1296+
try {
1297+
const auto& cluster = metadata->Cluster;
12971298

1298-
if (cluster != SessionCtx->GetCluster()) {
1299-
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1300-
}
1299+
if (cluster != SessionCtx->GetCluster()) {
1300+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1301+
}
13011302

1302-
std::pair<TString, TString> pathPair;
1303-
{
1304-
TString error;
1305-
if (!NSchemeHelpers::SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, createDir)) {
1306-
return MakeFuture(ResultFromError<TGenericResult>(error));
1303+
std::pair<TString, TString> pathPair;
1304+
{
1305+
TString error;
1306+
if (!NSchemeHelpers::SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, createDir)) {
1307+
return MakeFuture(ResultFromError<TGenericResult>(error));
1308+
}
13071309
}
1308-
}
13091310

1310-
TRemoveLastPhyTxHelper phyTxRemover;
1311-
auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
1312-
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1311+
NKikimrSchemeOp::TModifyScheme schemeTx;
1312+
schemeTx.SetWorkingDir(pathPair.first);
13131313

1314-
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableCreateColumnTable();
1315-
schemeTx.SetWorkingDir(pathPair.first);
1314+
Ydb::StatusIds::StatusCode code;
1315+
TString error;
13161316

1317-
Ydb::StatusIds::StatusCode code;
1318-
TString error;
1317+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnTable);
1318+
schemeTx.SetFailedOnAlreadyExists(!existingOk);
13191319

1320-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnTable);
1321-
schemeTx.SetFailedOnAlreadyExists(!existingOk);
1320+
NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable();
13221321

1323-
NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable();
1322+
tableDesc->SetName(pathPair.second);
1323+
FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata);
13241324

1325-
tableDesc->SetName(pathPair.second);
1326-
FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata);
1325+
if (!FillCreateColumnTableDesc(metadata, *tableDesc, code, error)) {
1326+
IKqpGateway::TGenericResult errResult;
1327+
errResult.AddIssue(NYql::TIssue(error));
1328+
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
1329+
return MakeFuture(std::move(errResult));
1330+
}
13271331

1328-
if (!FillCreateColumnTableDesc(metadata, *tableDesc, code, error)) {
1329-
IKqpGateway::TGenericResult errResult;
1330-
errResult.AddIssue(NYql::TIssue(error));
1331-
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
1332-
return MakeFuture(std::move(errResult));
1333-
}
1332+
if (IsPrepare()) {
1333+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1334+
auto& phyTx = *phyQuery.AddTransactions();
1335+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1336+
phyTx.MutableSchemeOperation()->MutableCreateTable()->Swap(&schemeTx);
13341337

1335-
if (IsPrepare()) {
1336-
TGenericResult result;
1337-
result.SetSuccess();
1338-
phyTxRemover.Forget();
1339-
return MakeFuture(result);
1340-
} else {
1341-
return Gateway->ModifyScheme(std::move(schemeTx));
1338+
TGenericResult result;
1339+
result.SetSuccess();
1340+
return MakeFuture(result);
1341+
} else {
1342+
return Gateway->ModifyScheme(std::move(schemeTx));
1343+
}
1344+
}
1345+
catch (yexception& e) {
1346+
return MakeFuture(ResultFromException<TGenericResult>(e));
13421347
}
13431348
}
13441349

@@ -1347,36 +1352,41 @@ class TKqpGatewayProxy : public IKikimrGateway {
13471352
{
13481353
CHECK_PREPARED_DDL(AlterColumnTable);
13491354

1350-
if (cluster != SessionCtx->GetCluster()) {
1351-
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1352-
}
1355+
try {
1356+
if (cluster != SessionCtx->GetCluster()) {
1357+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1358+
}
13531359

1354-
std::pair<TString, TString> pathPair;
1355-
{
1356-
TString error;
1357-
if (!NSchemeHelpers::SplitTablePath(settings.Table, GetDatabase(), pathPair, error, false)) {
1358-
return MakeFuture(ResultFromError<TGenericResult>(error));
1360+
std::pair<TString, TString> pathPair;
1361+
{
1362+
TString error;
1363+
if (!NSchemeHelpers::SplitTablePath(settings.Table, GetDatabase(), pathPair, error, false)) {
1364+
return MakeFuture(ResultFromError<TGenericResult>(error));
1365+
}
13591366
}
1360-
}
13611367

1362-
TRemoveLastPhyTxHelper phyTxRemover;
1363-
auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
1364-
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1368+
NKikimrSchemeOp::TModifyScheme schemeTx;
1369+
schemeTx.SetWorkingDir(pathPair.first);
13651370

1366-
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableCreateColumnTable();
1367-
schemeTx.SetWorkingDir(pathPair.first);
1371+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable);
1372+
NKikimrSchemeOp::TAlterColumnTable* alter = schemeTx.MutableAlterColumnTable();
1373+
alter->SetName(settings.Table);
13681374

1369-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable);
1370-
NKikimrSchemeOp::TAlterColumnTable* alter = schemeTx.MutableAlterColumnTable();
1371-
alter->SetName(settings.Table);
1375+
if (IsPrepare()) {
1376+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1377+
auto& phyTx = *phyQuery.AddTransactions();
1378+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1379+
phyTx.MutableSchemeOperation()->MutableAlterColumnTable()->Swap(&schemeTx);
13721380

1373-
if (IsPrepare()) {
1374-
TGenericResult result;
1375-
result.SetSuccess();
1376-
phyTxRemover.Forget();
1377-
return MakeFuture(result);
1378-
} else {
1379-
return Gateway->ModifyScheme(std::move(schemeTx));
1381+
TGenericResult result;
1382+
result.SetSuccess();
1383+
return MakeFuture(result);
1384+
} else {
1385+
return Gateway->ModifyScheme(std::move(schemeTx));
1386+
}
1387+
}
1388+
catch (yexception& e) {
1389+
return MakeFuture(ResultFromException<TGenericResult>(e));
13801390
}
13811391
}
13821392

@@ -1385,43 +1395,48 @@ class TKqpGatewayProxy : public IKikimrGateway {
13851395
{
13861396
CHECK_PREPARED_DDL(CreateTableStore);
13871397

1388-
if (cluster != SessionCtx->GetCluster()) {
1389-
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1390-
}
1398+
try {
1399+
if (cluster != SessionCtx->GetCluster()) {
1400+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1401+
}
13911402

1392-
std::pair<TString, TString> pathPair;
1393-
{
1394-
TString error;
1395-
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1396-
return MakeFuture(ResultFromError<TGenericResult>(error));
1403+
std::pair<TString, TString> pathPair;
1404+
{
1405+
TString error;
1406+
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1407+
return MakeFuture(ResultFromError<TGenericResult>(error));
1408+
}
13971409
}
1398-
}
13991410

1400-
TRemoveLastPhyTxHelper phyTxRemover;
1401-
auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
1402-
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1411+
NKikimrSchemeOp::TModifyScheme schemeTx;
1412+
schemeTx.SetWorkingDir(pathPair.first);
14031413

1404-
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableCreateTableStore();
1405-
schemeTx.SetWorkingDir(pathPair.first);
1414+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnStore);
1415+
schemeTx.SetFailedOnAlreadyExists(!existingOk);
14061416

1407-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnStore);
1408-
schemeTx.SetFailedOnAlreadyExists(!existingOk);
1417+
NKikimrSchemeOp::TColumnStoreDescription* storeDesc = schemeTx.MutableCreateColumnStore();
1418+
storeDesc->SetName(pathPair.second);
1419+
storeDesc->SetColumnShardCount(settings.ShardsCount);
14091420

1410-
NKikimrSchemeOp::TColumnStoreDescription* storeDesc = schemeTx.MutableCreateColumnStore();
1411-
storeDesc->SetName(pathPair.second);
1412-
storeDesc->SetColumnShardCount(settings.ShardsCount);
1421+
NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets();
1422+
schemaPreset->SetName("default");
1423+
FillColumnTableSchema(*schemaPreset->MutableSchema(), settings);
14131424

1414-
NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets();
1415-
schemaPreset->SetName("default");
1416-
FillColumnTableSchema(*schemaPreset->MutableSchema(), settings);
1425+
if (IsPrepare()) {
1426+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1427+
auto& phyTx = *phyQuery.AddTransactions();
1428+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1429+
phyTx.MutableSchemeOperation()->MutableCreateTableStore()->Swap(&schemeTx);
14171430

1418-
if (IsPrepare()) {
1419-
TGenericResult result;
1420-
result.SetSuccess();
1421-
phyTxRemover.Forget();
1422-
return MakeFuture(result);
1423-
} else {
1424-
return Gateway->ModifyScheme(std::move(schemeTx));
1431+
TGenericResult result;
1432+
result.SetSuccess();
1433+
return MakeFuture(result);
1434+
} else {
1435+
return Gateway->ModifyScheme(std::move(schemeTx));
1436+
}
1437+
}
1438+
catch (yexception& e) {
1439+
return MakeFuture(ResultFromException<TGenericResult>(e));
14251440
}
14261441
}
14271442

@@ -1430,36 +1445,41 @@ class TKqpGatewayProxy : public IKikimrGateway {
14301445
{
14311446
CHECK_PREPARED_DDL(AlterTableStore);
14321447

1433-
if (cluster != SessionCtx->GetCluster()) {
1434-
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1435-
}
1448+
try {
1449+
if (cluster != SessionCtx->GetCluster()) {
1450+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1451+
}
14361452

1437-
std::pair<TString, TString> pathPair;
1438-
{
1439-
TString error;
1440-
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1441-
return MakeFuture(ResultFromError<TGenericResult>(error));
1453+
std::pair<TString, TString> pathPair;
1454+
{
1455+
TString error;
1456+
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1457+
return MakeFuture(ResultFromError<TGenericResult>(error));
1458+
}
14421459
}
1443-
}
14441460

1445-
TRemoveLastPhyTxHelper phyTxRemover;
1446-
auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
1447-
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1461+
NKikimrSchemeOp::TModifyScheme schemeTx;
1462+
schemeTx.SetWorkingDir(pathPair.first);
14481463

1449-
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableAlterTableStore();
1450-
schemeTx.SetWorkingDir(pathPair.first);
1464+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnStore);
1465+
NKikimrSchemeOp::TAlterColumnStore* alter = schemeTx.MutableAlterColumnStore();
1466+
alter->SetName(pathPair.second);
14511467

1452-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnStore);
1453-
NKikimrSchemeOp::TAlterColumnStore* alter = schemeTx.MutableAlterColumnStore();
1454-
alter->SetName(pathPair.second);
1468+
if (IsPrepare()) {
1469+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1470+
auto& phyTx = *phyQuery.AddTransactions();
1471+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1472+
phyTx.MutableSchemeOperation()->MutableAlterTableStore()->Swap(&schemeTx);
14551473

1456-
if (IsPrepare()) {
1457-
TGenericResult result;
1458-
result.SetSuccess();
1459-
phyTxRemover.Forget();
1460-
return MakeFuture(result);
1461-
} else {
1462-
return Gateway->ModifyScheme(std::move(schemeTx));
1474+
TGenericResult result;
1475+
result.SetSuccess();
1476+
return MakeFuture(result);
1477+
} else {
1478+
return Gateway->ModifyScheme(std::move(schemeTx));
1479+
}
1480+
}
1481+
catch (yexception& e) {
1482+
return MakeFuture(ResultFromException<TGenericResult>(e));
14631483
}
14641484
}
14651485

@@ -1468,36 +1488,41 @@ class TKqpGatewayProxy : public IKikimrGateway {
14681488
{
14691489
CHECK_PREPARED_DDL(DropTableStore);
14701490

1471-
if (cluster != SessionCtx->GetCluster()) {
1472-
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1473-
}
1491+
try {
1492+
if (cluster != SessionCtx->GetCluster()) {
1493+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1494+
}
14741495

1475-
std::pair<TString, TString> pathPair;
1476-
{
1477-
TString error;
1478-
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1479-
return MakeFuture(ResultFromError<TGenericResult>(error));
1496+
std::pair<TString, TString> pathPair;
1497+
{
1498+
TString error;
1499+
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1500+
return MakeFuture(ResultFromError<TGenericResult>(error));
1501+
}
14801502
}
1481-
}
14821503

1483-
TRemoveLastPhyTxHelper phyTxRemover;
1484-
auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
1485-
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1504+
NKikimrSchemeOp::TModifyScheme schemeTx;
1505+
schemeTx.SetWorkingDir(pathPair.first);
1506+
schemeTx.SetSuccessOnNotExist(missingOk);
1507+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropColumnStore);
1508+
NKikimrSchemeOp::TDrop* drop = schemeTx.MutableDrop();
1509+
drop->SetName(pathPair.second);
14861510

1487-
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableDropTableStore();
1488-
schemeTx.SetWorkingDir(pathPair.first);
1489-
schemeTx.SetSuccessOnNotExist(missingOk);
1490-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropColumnStore);
1491-
NKikimrSchemeOp::TDrop* drop = schemeTx.MutableDrop();
1492-
drop->SetName(pathPair.second);
1511+
if (IsPrepare()) {
1512+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1513+
auto& phyTx = *phyQuery.AddTransactions();
1514+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1515+
phyTx.MutableSchemeOperation()->MutableDropTableStore()->Swap(&schemeTx);
14931516

1494-
if (IsPrepare()) {
1495-
TGenericResult result;
1496-
result.SetSuccess();
1497-
phyTxRemover.Forget();
1498-
return MakeFuture(result);
1499-
} else {
1500-
return Gateway->ModifyScheme(std::move(schemeTx));
1517+
TGenericResult result;
1518+
result.SetSuccess();
1519+
return MakeFuture(result);
1520+
} else {
1521+
return Gateway->ModifyScheme(std::move(schemeTx));
1522+
}
1523+
}
1524+
catch (yexception& e) {
1525+
return MakeFuture(ResultFromException<TGenericResult>(e));
15011526
}
15021527
}
15031528

0 commit comments

Comments
 (0)