Skip to content

Commit 73df47d

Browse files
committed
Fixes
1 parent 2da7452 commit 73df47d

File tree

1 file changed

+162
-137
lines changed

1 file changed

+162
-137
lines changed

ydb/core/kqp/host/kqp_gateway_proxy.cpp

+162-137
Original file line numberDiff line numberDiff line change
@@ -1292,52 +1292,57 @@ class TKqpGatewayProxy : public IKikimrGateway {
12921292
bool createDir, bool existingOk) override {
12931293
CHECK_PREPARED_DDL(CreateColumnTable);
12941294

1295-
const auto& cluster = metadata->Cluster;
1295+
try {
1296+
const auto& cluster = metadata->Cluster;
12961297

1297-
if (cluster != SessionCtx->GetCluster()) {
1298-
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1299-
}
1298+
if (cluster != SessionCtx->GetCluster()) {
1299+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1300+
}
13001301

1301-
std::pair<TString, TString> pathPair;
1302-
{
1303-
TString error;
1304-
if (!NSchemeHelpers::SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, createDir)) {
1305-
return MakeFuture(ResultFromError<TGenericResult>(error));
1302+
std::pair<TString, TString> pathPair;
1303+
{
1304+
TString error;
1305+
if (!NSchemeHelpers::SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, createDir)) {
1306+
return MakeFuture(ResultFromError<TGenericResult>(error));
1307+
}
13061308
}
1307-
}
13081309

1309-
TRemoveLastPhyTxHelper phyTxRemover;
1310-
auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
1311-
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1310+
NKikimrSchemeOp::TModifyScheme schemeTx;
1311+
schemeTx.SetWorkingDir(pathPair.first);
13121312

1313-
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableCreateColumnTable();
1314-
schemeTx.SetWorkingDir(pathPair.first);
1313+
Ydb::StatusIds::StatusCode code;
1314+
TString error;
13151315

1316-
Ydb::StatusIds::StatusCode code;
1317-
TString error;
1316+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnTable);
1317+
schemeTx.SetFailedOnAlreadyExists(!existingOk);
13181318

1319-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnTable);
1320-
schemeTx.SetFailedOnAlreadyExists(!existingOk);
1319+
NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable();
13211320

1322-
NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable();
1321+
tableDesc->SetName(pathPair.second);
1322+
FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata);
13231323

1324-
tableDesc->SetName(pathPair.second);
1325-
FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata);
1324+
if (!FillCreateColumnTableDesc(metadata, *tableDesc, code, error)) {
1325+
IKqpGateway::TGenericResult errResult;
1326+
errResult.AddIssue(NYql::TIssue(error));
1327+
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
1328+
return MakeFuture(std::move(errResult));
1329+
}
13261330

1327-
if (!FillCreateColumnTableDesc(metadata, *tableDesc, code, error)) {
1328-
IKqpGateway::TGenericResult errResult;
1329-
errResult.AddIssue(NYql::TIssue(error));
1330-
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
1331-
return MakeFuture(std::move(errResult));
1332-
}
1331+
if (IsPrepare()) {
1332+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1333+
auto& phyTx = *phyQuery.AddTransactions();
1334+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1335+
phyTx.MutableSchemeOperation()->MutableCreateTable()->Swap(&schemeTx);
13331336

1334-
if (IsPrepare()) {
1335-
TGenericResult result;
1336-
result.SetSuccess();
1337-
phyTxRemover.Forget();
1338-
return MakeFuture(result);
1339-
} else {
1340-
return Gateway->ModifyScheme(std::move(schemeTx));
1337+
TGenericResult result;
1338+
result.SetSuccess();
1339+
return MakeFuture(result);
1340+
} else {
1341+
return Gateway->ModifyScheme(std::move(schemeTx));
1342+
}
1343+
}
1344+
catch (yexception& e) {
1345+
return MakeFuture(ResultFromException<TGenericResult>(e));
13411346
}
13421347
}
13431348

@@ -1346,36 +1351,41 @@ class TKqpGatewayProxy : public IKikimrGateway {
13461351
{
13471352
CHECK_PREPARED_DDL(AlterColumnTable);
13481353

1349-
if (cluster != SessionCtx->GetCluster()) {
1350-
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1351-
}
1354+
try {
1355+
if (cluster != SessionCtx->GetCluster()) {
1356+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1357+
}
13521358

1353-
std::pair<TString, TString> pathPair;
1354-
{
1355-
TString error;
1356-
if (!NSchemeHelpers::SplitTablePath(settings.Table, GetDatabase(), pathPair, error, false)) {
1357-
return MakeFuture(ResultFromError<TGenericResult>(error));
1359+
std::pair<TString, TString> pathPair;
1360+
{
1361+
TString error;
1362+
if (!NSchemeHelpers::SplitTablePath(settings.Table, GetDatabase(), pathPair, error, false)) {
1363+
return MakeFuture(ResultFromError<TGenericResult>(error));
1364+
}
13581365
}
1359-
}
13601366

1361-
TRemoveLastPhyTxHelper phyTxRemover;
1362-
auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
1363-
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1367+
NKikimrSchemeOp::TModifyScheme schemeTx;
1368+
schemeTx.SetWorkingDir(pathPair.first);
13641369

1365-
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableCreateColumnTable();
1366-
schemeTx.SetWorkingDir(pathPair.first);
1370+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable);
1371+
NKikimrSchemeOp::TAlterColumnTable* alter = schemeTx.MutableAlterColumnTable();
1372+
alter->SetName(settings.Table);
13671373

1368-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable);
1369-
NKikimrSchemeOp::TAlterColumnTable* alter = schemeTx.MutableAlterColumnTable();
1370-
alter->SetName(settings.Table);
1374+
if (IsPrepare()) {
1375+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1376+
auto& phyTx = *phyQuery.AddTransactions();
1377+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1378+
phyTx.MutableSchemeOperation()->MutableAlterColumnTable()->Swap(&schemeTx);
13711379

1372-
if (IsPrepare()) {
1373-
TGenericResult result;
1374-
result.SetSuccess();
1375-
phyTxRemover.Forget();
1376-
return MakeFuture(result);
1377-
} else {
1378-
return Gateway->ModifyScheme(std::move(schemeTx));
1380+
TGenericResult result;
1381+
result.SetSuccess();
1382+
return MakeFuture(result);
1383+
} else {
1384+
return Gateway->ModifyScheme(std::move(schemeTx));
1385+
}
1386+
}
1387+
catch (yexception& e) {
1388+
return MakeFuture(ResultFromException<TGenericResult>(e));
13791389
}
13801390
}
13811391

@@ -1384,43 +1394,48 @@ class TKqpGatewayProxy : public IKikimrGateway {
13841394
{
13851395
CHECK_PREPARED_DDL(CreateTableStore);
13861396

1387-
if (cluster != SessionCtx->GetCluster()) {
1388-
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1389-
}
1397+
try {
1398+
if (cluster != SessionCtx->GetCluster()) {
1399+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1400+
}
13901401

1391-
std::pair<TString, TString> pathPair;
1392-
{
1393-
TString error;
1394-
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1395-
return MakeFuture(ResultFromError<TGenericResult>(error));
1402+
std::pair<TString, TString> pathPair;
1403+
{
1404+
TString error;
1405+
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1406+
return MakeFuture(ResultFromError<TGenericResult>(error));
1407+
}
13961408
}
1397-
}
13981409

1399-
TRemoveLastPhyTxHelper phyTxRemover;
1400-
auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
1401-
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1410+
NKikimrSchemeOp::TModifyScheme schemeTx;
1411+
schemeTx.SetWorkingDir(pathPair.first);
14021412

1403-
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableCreateTableStore();
1404-
schemeTx.SetWorkingDir(pathPair.first);
1413+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnStore);
1414+
schemeTx.SetFailedOnAlreadyExists(!existingOk);
14051415

1406-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateColumnStore);
1407-
schemeTx.SetFailedOnAlreadyExists(!existingOk);
1416+
NKikimrSchemeOp::TColumnStoreDescription* storeDesc = schemeTx.MutableCreateColumnStore();
1417+
storeDesc->SetName(pathPair.second);
1418+
storeDesc->SetColumnShardCount(settings.ShardsCount);
14081419

1409-
NKikimrSchemeOp::TColumnStoreDescription* storeDesc = schemeTx.MutableCreateColumnStore();
1410-
storeDesc->SetName(pathPair.second);
1411-
storeDesc->SetColumnShardCount(settings.ShardsCount);
1420+
NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets();
1421+
schemaPreset->SetName("default");
1422+
FillColumnTableSchema(*schemaPreset->MutableSchema(), settings);
14121423

1413-
NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets();
1414-
schemaPreset->SetName("default");
1415-
FillColumnTableSchema(*schemaPreset->MutableSchema(), settings);
1424+
if (IsPrepare()) {
1425+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1426+
auto& phyTx = *phyQuery.AddTransactions();
1427+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1428+
phyTx.MutableSchemeOperation()->MutableCreateTableStore()->Swap(&schemeTx);
14161429

1417-
if (IsPrepare()) {
1418-
TGenericResult result;
1419-
result.SetSuccess();
1420-
phyTxRemover.Forget();
1421-
return MakeFuture(result);
1422-
} else {
1423-
return Gateway->ModifyScheme(std::move(schemeTx));
1430+
TGenericResult result;
1431+
result.SetSuccess();
1432+
return MakeFuture(result);
1433+
} else {
1434+
return Gateway->ModifyScheme(std::move(schemeTx));
1435+
}
1436+
}
1437+
catch (yexception& e) {
1438+
return MakeFuture(ResultFromException<TGenericResult>(e));
14241439
}
14251440
}
14261441

@@ -1429,36 +1444,41 @@ class TKqpGatewayProxy : public IKikimrGateway {
14291444
{
14301445
CHECK_PREPARED_DDL(AlterTableStore);
14311446

1432-
if (cluster != SessionCtx->GetCluster()) {
1433-
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1434-
}
1447+
try {
1448+
if (cluster != SessionCtx->GetCluster()) {
1449+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1450+
}
14351451

1436-
std::pair<TString, TString> pathPair;
1437-
{
1438-
TString error;
1439-
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1440-
return MakeFuture(ResultFromError<TGenericResult>(error));
1452+
std::pair<TString, TString> pathPair;
1453+
{
1454+
TString error;
1455+
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1456+
return MakeFuture(ResultFromError<TGenericResult>(error));
1457+
}
14411458
}
1442-
}
14431459

1444-
TRemoveLastPhyTxHelper phyTxRemover;
1445-
auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
1446-
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1460+
NKikimrSchemeOp::TModifyScheme schemeTx;
1461+
schemeTx.SetWorkingDir(pathPair.first);
14471462

1448-
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableAlterTableStore();
1449-
schemeTx.SetWorkingDir(pathPair.first);
1463+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnStore);
1464+
NKikimrSchemeOp::TAlterColumnStore* alter = schemeTx.MutableAlterColumnStore();
1465+
alter->SetName(pathPair.second);
14501466

1451-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnStore);
1452-
NKikimrSchemeOp::TAlterColumnStore* alter = schemeTx.MutableAlterColumnStore();
1453-
alter->SetName(pathPair.second);
1467+
if (IsPrepare()) {
1468+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1469+
auto& phyTx = *phyQuery.AddTransactions();
1470+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1471+
phyTx.MutableSchemeOperation()->MutableAlterTableStore()->Swap(&schemeTx);
14541472

1455-
if (IsPrepare()) {
1456-
TGenericResult result;
1457-
result.SetSuccess();
1458-
phyTxRemover.Forget();
1459-
return MakeFuture(result);
1460-
} else {
1461-
return Gateway->ModifyScheme(std::move(schemeTx));
1473+
TGenericResult result;
1474+
result.SetSuccess();
1475+
return MakeFuture(result);
1476+
} else {
1477+
return Gateway->ModifyScheme(std::move(schemeTx));
1478+
}
1479+
}
1480+
catch (yexception& e) {
1481+
return MakeFuture(ResultFromException<TGenericResult>(e));
14621482
}
14631483
}
14641484

@@ -1467,36 +1487,41 @@ class TKqpGatewayProxy : public IKikimrGateway {
14671487
{
14681488
CHECK_PREPARED_DDL(DropTableStore);
14691489

1470-
if (cluster != SessionCtx->GetCluster()) {
1471-
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1472-
}
1490+
try {
1491+
if (cluster != SessionCtx->GetCluster()) {
1492+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1493+
}
14731494

1474-
std::pair<TString, TString> pathPair;
1475-
{
1476-
TString error;
1477-
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1478-
return MakeFuture(ResultFromError<TGenericResult>(error));
1495+
std::pair<TString, TString> pathPair;
1496+
{
1497+
TString error;
1498+
if (!NSchemeHelpers::SplitTablePath(settings.TableStore, GetDatabase(), pathPair, error, false)) {
1499+
return MakeFuture(ResultFromError<TGenericResult>(error));
1500+
}
14791501
}
1480-
}
14811502

1482-
TRemoveLastPhyTxHelper phyTxRemover;
1483-
auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
1484-
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1503+
NKikimrSchemeOp::TModifyScheme schemeTx;
1504+
schemeTx.SetWorkingDir(pathPair.first);
1505+
schemeTx.SetSuccessOnNotExist(missingOk);
1506+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropColumnStore);
1507+
NKikimrSchemeOp::TDrop* drop = schemeTx.MutableDrop();
1508+
drop->SetName(pathPair.second);
14851509

1486-
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableDropTableStore();
1487-
schemeTx.SetWorkingDir(pathPair.first);
1488-
schemeTx.SetSuccessOnNotExist(missingOk);
1489-
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropColumnStore);
1490-
NKikimrSchemeOp::TDrop* drop = schemeTx.MutableDrop();
1491-
drop->SetName(pathPair.second);
1510+
if (IsPrepare()) {
1511+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1512+
auto& phyTx = *phyQuery.AddTransactions();
1513+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1514+
phyTx.MutableSchemeOperation()->MutableDropTableStore()->Swap(&schemeTx);
14921515

1493-
if (IsPrepare()) {
1494-
TGenericResult result;
1495-
result.SetSuccess();
1496-
phyTxRemover.Forget();
1497-
return MakeFuture(result);
1498-
} else {
1499-
return Gateway->ModifyScheme(std::move(schemeTx));
1516+
TGenericResult result;
1517+
result.SetSuccess();
1518+
return MakeFuture(result);
1519+
} else {
1520+
return Gateway->ModifyScheme(std::move(schemeTx));
1521+
}
1522+
}
1523+
catch (yexception& e) {
1524+
return MakeFuture(ResultFromException<TGenericResult>(e));
15001525
}
15011526
}
15021527

0 commit comments

Comments
 (0)