Skip to content

Commit f1cd202

Browse files
committed
Support IF NOT EXISTS/IF EXISTS for tables, external tables and external data sources
1 parent 8fea8a7 commit f1cd202

23 files changed

+534
-217
lines changed

ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp

+13-11
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri
8383
}
8484
}
8585

86-
void FillCreateExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyScheme, const NYql::TObjectSettingsImpl& settings,
86+
void FillCreateExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyScheme, const NYql::TCreateObjectSettings& settings,
8787
TExternalDataSourceManager::TInternalModificationContext& context) {
8888
CheckFeatureFlag(context);
8989

@@ -97,12 +97,13 @@ void FillCreateExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyS
9797

9898
modifyScheme.SetWorkingDir(pathPair.first);
9999
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalDataSource);
100+
modifyScheme.SetFailedOnAlreadyExists(!settings.GetExistingOk());
100101

101102
NKikimrSchemeOp::TExternalDataSourceDescription& dataSourceDesc = *modifyScheme.MutableCreateExternalDataSource();
102103
FillCreateExternalDataSourceDesc(dataSourceDesc, pathPair.second, settings);
103104
}
104105

105-
void FillDropExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyScheme, const NYql::TObjectSettingsImpl& settings,
106+
void FillDropExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyScheme, const NYql::TDropObjectSettings& settings,
106107
TExternalDataSourceManager::TInternalModificationContext& context) {
107108
CheckFeatureFlag(context);
108109

@@ -116,15 +117,16 @@ void FillDropExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifySch
116117

117118
modifyScheme.SetWorkingDir(pathPair.first);
118119
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalDataSource);
120+
modifyScheme.SetSuccessOnNotExist(settings.GetMissingOk());
119121

120122
NKikimrSchemeOp::TDrop& drop = *modifyScheme.MutableDrop();
121123
drop.SetName(pathPair.second);
122124
}
123125

124-
NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request, TActorSystem* actorSystem, bool failedOnAlreadyExists = false)
126+
NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request, TActorSystem* actorSystem, bool failedOnAlreadyExists, bool successOnNotExist)
125127
{
126128
auto promiseScheme = NThreading::NewPromise<NKqp::TSchemeOpRequestHandler::TResult>();
127-
IActor* requestHandler = new TSchemeOpRequestHandler(request, promiseScheme, failedOnAlreadyExists);
129+
IActor* requestHandler = new TSchemeOpRequestHandler(request, promiseScheme, failedOnAlreadyExists, successOnNotExist);
128130
actorSystem->Register(requestHandler);
129131
return promiseScheme.GetFuture().Apply([](const NThreading::TFuture<NKqp::TSchemeOpRequestHandler::TResult>& f) {
130132
if (f.HasValue() && !f.HasException() && f.GetValue().Success()) {
@@ -159,7 +161,7 @@ NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalD
159161
}
160162
}
161163

162-
NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalDataSourceManager::CreateExternalDataSource(const NYql::TObjectSettingsImpl& settings,
164+
NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalDataSourceManager::CreateExternalDataSource(const NYql::TCreateObjectSettings& settings,
163165
TInternalModificationContext& context) const {
164166
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
165167

@@ -172,10 +174,10 @@ NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalD
172174
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
173175
FillCreateExternalDataSourceCommand(schemeTx, settings, context);
174176

175-
return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem(), true);
177+
return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem(), schemeTx.GetFailedOnAlreadyExists(), schemeTx.GetSuccessOnNotExist());
176178
}
177179

178-
NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalDataSourceManager::DropExternalDataSource(const NYql::TObjectSettingsImpl& settings,
180+
NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalDataSourceManager::DropExternalDataSource(const NYql::TDropObjectSettings& settings,
179181
TInternalModificationContext& context) const {
180182
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
181183

@@ -188,7 +190,7 @@ NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalD
188190
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
189191
FillDropExternalDataSourceCommand(schemeTx, settings, context);
190192

191-
return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem());
193+
return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem(), schemeTx.GetFailedOnAlreadyExists(), schemeTx.GetSuccessOnNotExist());
192194
}
193195

194196
TExternalDataSourceManager::TYqlConclusionStatus TExternalDataSourceManager::DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
@@ -216,12 +218,12 @@ TExternalDataSourceManager::TYqlConclusionStatus TExternalDataSourceManager::DoP
216218
}
217219
}
218220

219-
void TExternalDataSourceManager::PrepareCreateExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
221+
void TExternalDataSourceManager::PrepareCreateExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TCreateObjectSettings& settings,
220222
TInternalModificationContext& context) const {
221223
FillCreateExternalDataSourceCommand(*schemeOperation.MutableCreateExternalDataSource(), settings, context);
222224
}
223225

224-
void TExternalDataSourceManager::PrepareDropExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
226+
void TExternalDataSourceManager::PrepareDropExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings,
225227
TInternalModificationContext& context) const {
226228
FillDropExternalDataSourceCommand(*schemeOperation.MutableDropExternalDataSource(), settings, context);
227229
}
@@ -252,7 +254,7 @@ NThreading::TFuture<NMetadata::NModifications::IOperationsManager::TYqlConclusio
252254
TStringBuilder() << "Execution of prepare operation for EXTERNAL_DATA_SOURCE object: unsupported operation: " << int(schemeOperation.GetOperationCase())));
253255
}
254256

255-
return SendSchemeRequest(ev.Release(), context.GetActorSystem(), true);
257+
return SendSchemeRequest(ev.Release(), context.GetActorSystem(), schemeTx.GetFailedOnAlreadyExists(), schemeTx.GetSuccessOnNotExist());
256258
}
257259

258260
}

ydb/core/kqp/gateway/behaviour/external_data_source/manager.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@
55
namespace NKikimr::NKqp {
66

77
class TExternalDataSourceManager: public NMetadata::NModifications::IOperationsManager {
8-
NThreading::TFuture<TYqlConclusionStatus> CreateExternalDataSource(const NYql::TObjectSettingsImpl& settings,
8+
NThreading::TFuture<TYqlConclusionStatus> CreateExternalDataSource(const NYql::TCreateObjectSettings& settings,
99
TInternalModificationContext& context) const;
1010

11-
NThreading::TFuture<TYqlConclusionStatus> DropExternalDataSource(const NYql::TObjectSettingsImpl& settings,
11+
NThreading::TFuture<TYqlConclusionStatus> DropExternalDataSource(const NYql::TDropObjectSettings& settings,
1212
TInternalModificationContext& context) const;
1313

14-
void PrepareCreateExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
14+
void PrepareCreateExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TCreateObjectSettings& settings,
1515
TInternalModificationContext& context) const;
1616

17-
void PrepareDropExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
17+
void PrepareDropExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings,
1818
TInternalModificationContext& context) const;
1919

2020
protected:

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

+7-4
Original file line numberDiff line numberDiff line change
@@ -1187,7 +1187,7 @@ class TKikimrIcGateway : public IKqpGateway {
11871187

11881188
TFuture<TGenericResult> CreateExternalTable(const TString& cluster,
11891189
const NYql::TCreateExternalTableSettings& settings,
1190-
bool createDir) override {
1190+
bool createDir, bool existingOk) override {
11911191
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
11921192

11931193
try {
@@ -1211,6 +1211,7 @@ class TKikimrIcGateway : public IKqpGateway {
12111211
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
12121212
schemeTx.SetWorkingDir(pathPair.first);
12131213
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalTable);
1214+
schemeTx.SetFailedOnAlreadyExists(!existingOk);
12141215

12151216
NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
12161217
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
@@ -1228,7 +1229,8 @@ class TKikimrIcGateway : public IKqpGateway {
12281229
}
12291230

12301231
TFuture<TGenericResult> DropExternalTable(const TString& cluster,
1231-
const NYql::TDropExternalTableSettings& settings) override {
1232+
const NYql::TDropExternalTableSettings& settings,
1233+
bool missingOk) override {
12321234
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
12331235

12341236
try {
@@ -1253,6 +1255,7 @@ class TKikimrIcGateway : public IKqpGateway {
12531255
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
12541256
schemeTx.SetWorkingDir(pathPair.first);
12551257
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalTable);
1258+
schemeTx.SetSuccessOnNotExist(missingOk);
12561259

12571260
NKikimrSchemeOp::TDrop& drop = *schemeTx.MutableDrop();
12581261
drop.SetName(pathPair.second);
@@ -1489,7 +1492,7 @@ class TKikimrIcGateway : public IKqpGateway {
14891492
auto& dropUser = *schemeTx.MutableAlterLogin()->MutableRemoveUser();
14901493

14911494
dropUser.SetUser(settings.UserName);
1492-
dropUser.SetMissingOk(settings.Force);
1495+
dropUser.SetMissingOk(settings.MissingOk);
14931496

14941497
SendSchemeRequest(ev.Release()).Apply(
14951498
[dropUserPromise](const TFuture<TGenericResult>& future) mutable {
@@ -1840,7 +1843,7 @@ class TKikimrIcGateway : public IKqpGateway {
18401843
auto& dropGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroup();
18411844

18421845
dropGroup.SetGroup(settings.GroupName);
1843-
dropGroup.SetMissingOk(settings.Force);
1846+
dropGroup.SetMissingOk(settings.MissingOk);
18441847

18451848
SendSchemeRequest(ev.Release()).Apply(
18461849
[dropGroupPromise](const TFuture<TGenericResult>& future) mutable {

ydb/core/kqp/host/kqp_gateway_proxy.cpp

+9-6
Original file line numberDiff line numberDiff line change
@@ -958,7 +958,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
958958
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
959959
auto& dropUser = *schemeTx.MutableAlterLogin()->MutableRemoveUser();
960960
dropUser.SetUser(settings.UserName);
961-
dropUser.SetMissingOk(settings.Force);
961+
dropUser.SetMissingOk(settings.MissingOk);
962962

963963
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
964964
auto& phyTx = *phyQuery.AddTransactions();
@@ -1195,7 +1195,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
11951195
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
11961196
auto& dropGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroup();
11971197
dropGroup.SetGroup(settings.GroupName);
1198-
dropGroup.SetMissingOk(settings.Force);
1198+
dropGroup.SetMissingOk(settings.MissingOk);
11991199

12001200
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
12011201
auto& phyTx = *phyQuery.AddTransactions();
@@ -1240,7 +1240,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
12401240
}
12411241

12421242
TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings,
1243-
bool createDir) override
1243+
bool createDir, bool existingOk) override
12441244
{
12451245
CHECK_PREPARED_DDL(CreateExternalTable);
12461246

@@ -1264,6 +1264,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
12641264
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableCreateExternalTable();
12651265
schemeTx.SetWorkingDir(pathPair.first);
12661266
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalTable);
1267+
schemeTx.SetFailedOnAlreadyExists(!existingOk);
12671268

12681269
NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
12691270
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
@@ -1272,7 +1273,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
12721273
phyTxRemover.Forget();
12731274
return MakeFuture(result);
12741275
} else {
1275-
return Gateway->CreateExternalTable(cluster, settings, createDir);
1276+
return Gateway->CreateExternalTable(cluster, settings, createDir, existingOk);
12761277
}
12771278
}
12781279

@@ -1283,7 +1284,8 @@ class TKqpGatewayProxy : public IKikimrGateway {
12831284
}
12841285

12851286
TFuture<TGenericResult> DropExternalTable(const TString& cluster,
1286-
const TDropExternalTableSettings& settings) override
1287+
const TDropExternalTableSettings& settings,
1288+
bool missingOk) override
12871289
{
12881290
CHECK_PREPARED_DDL(DropExternalTable);
12891291

@@ -1307,6 +1309,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
13071309
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableDropExternalTable();
13081310
schemeTx.SetWorkingDir(pathPair.first);
13091311
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalTable);
1312+
schemeTx.SetSuccessOnNotExist(missingOk);
13101313

13111314
NKikimrSchemeOp::TDrop& drop = *schemeTx.MutableDrop();
13121315
drop.SetName(pathPair.second);
@@ -1316,7 +1319,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
13161319
phyTxRemover.Forget();
13171320
return MakeFuture(result);
13181321
} else {
1319-
return Gateway->DropExternalTable(cluster, settings);
1322+
return Gateway->DropExternalTable(cluster, settings, missingOk);
13201323
}
13211324
}
13221325

ydb/core/kqp/provider/yql_kikimr_datasink.cpp

+16-4
Original file line numberDiff line numberDiff line change
@@ -856,13 +856,16 @@ class TKikimrDataSink : public TDataProviderBase
856856
.Features(settings.Features)
857857
.Done()
858858
.Ptr();
859-
} else if (mode == "createObject") {
859+
} else if (mode == "createObject" || mode == "createObjectIfNotExists") {
860860
return Build<TKiCreateObject>(ctx, node->Pos())
861861
.World(node->Child(0))
862862
.DataSink(node->Child(1))
863863
.ObjectId().Build(key.GetObjectId())
864864
.TypeId().Build(key.GetObjectType())
865865
.Features(settings.Features)
866+
.ExistingOk<TCoAtom>()
867+
.Value(mode == "createObjectIfNotExists")
868+
.Build()
866869
.Done()
867870
.Ptr();
868871
} else if (mode == "alterObject") {
@@ -874,13 +877,16 @@ class TKikimrDataSink : public TDataProviderBase
874877
.Features(settings.Features)
875878
.Done()
876879
.Ptr();
877-
} else if (mode == "dropObject") {
880+
} else if (mode == "dropObject" || mode == "dropObjectIfExists") {
878881
return Build<TKiDropObject>(ctx, node->Pos())
879882
.World(node->Child(0))
880883
.DataSink(node->Child(1))
881884
.ObjectId().Build(key.GetObjectId())
882885
.TypeId().Build(key.GetObjectType())
883886
.Features(settings.Features)
887+
.MissingOk<TCoAtom>()
888+
.Value(mode == "dropObjectIfExists")
889+
.Build()
884890
.Done()
885891
.Ptr();
886892
} else {
@@ -910,12 +916,15 @@ class TKikimrDataSink : public TDataProviderBase
910916
.Settings(settings.Other)
911917
.Done()
912918
.Ptr();
913-
} else if (mode == "dropUser") {
919+
} else if (mode == "dropUser" || mode == "dropUserIfExists") {
914920
return Build<TKiDropUser>(ctx, node->Pos())
915921
.World(node->Child(0))
916922
.DataSink(node->Child(1))
917923
.UserName().Build(key.GetRoleName())
918924
.Settings(settings.Other)
925+
.MissingOk<TCoAtom>()
926+
.Value(mode == "dropUserIfExists")
927+
.Build()
919928
.Done()
920929
.Ptr();
921930
} else if (mode == "createGroup") {
@@ -943,12 +952,15 @@ class TKikimrDataSink : public TDataProviderBase
943952
.NewName(settings.NewName.Cast())
944953
.Done()
945954
.Ptr();
946-
} else if (mode == "dropGroup") {
955+
} else if (mode == "dropGroup" || mode == "dropGroupIfExists") {
947956
return Build<TKiDropGroup>(ctx, node->Pos())
948957
.World(node->Child(0))
949958
.DataSink(node->Child(1))
950959
.GroupName().Build(key.GetRoleName())
951960
.Settings(settings.Other)
961+
.MissingOk<TCoAtom>()
962+
.Value(mode == "dropGroupIfExists")
963+
.Build()
952964
.Done()
953965
.Ptr();
954966
} else {

0 commit comments

Comments
 (0)