Skip to content

Support IF NOT EXISTS/IF EXISTS for tables, external tables and external data sources #694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri
}
}

void FillCreateExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyScheme, const NYql::TObjectSettingsImpl& settings,
void FillCreateExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyScheme, const NYql::TCreateObjectSettings& settings,
TExternalDataSourceManager::TInternalModificationContext& context) {
CheckFeatureFlag(context);

Expand All @@ -97,12 +97,13 @@ void FillCreateExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyS

modifyScheme.SetWorkingDir(pathPair.first);
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalDataSource);
modifyScheme.SetFailedOnAlreadyExists(!settings.GetExistingOk());

NKikimrSchemeOp::TExternalDataSourceDescription& dataSourceDesc = *modifyScheme.MutableCreateExternalDataSource();
FillCreateExternalDataSourceDesc(dataSourceDesc, pathPair.second, settings);
}

void FillDropExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyScheme, const NYql::TObjectSettingsImpl& settings,
void FillDropExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyScheme, const NYql::TDropObjectSettings& settings,
TExternalDataSourceManager::TInternalModificationContext& context) {
CheckFeatureFlag(context);

Expand All @@ -116,15 +117,16 @@ void FillDropExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifySch

modifyScheme.SetWorkingDir(pathPair.first);
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalDataSource);
modifyScheme.SetSuccessOnNotExist(settings.GetMissingOk());

NKikimrSchemeOp::TDrop& drop = *modifyScheme.MutableDrop();
drop.SetName(pathPair.second);
}

NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request, TActorSystem* actorSystem, bool failedOnAlreadyExists = false)
NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request, TActorSystem* actorSystem, bool failedOnAlreadyExists, bool successOnNotExist)
{
auto promiseScheme = NThreading::NewPromise<NKqp::TSchemeOpRequestHandler::TResult>();
IActor* requestHandler = new TSchemeOpRequestHandler(request, promiseScheme, failedOnAlreadyExists);
IActor* requestHandler = new TSchemeOpRequestHandler(request, promiseScheme, failedOnAlreadyExists, successOnNotExist);
actorSystem->Register(requestHandler);
return promiseScheme.GetFuture().Apply([](const NThreading::TFuture<NKqp::TSchemeOpRequestHandler::TResult>& f) {
if (f.HasValue() && !f.HasException() && f.GetValue().Success()) {
Expand Down Expand Up @@ -159,7 +161,7 @@ NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalD
}
}

NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalDataSourceManager::CreateExternalDataSource(const NYql::TObjectSettingsImpl& settings,
NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalDataSourceManager::CreateExternalDataSource(const NYql::TCreateObjectSettings& settings,
TInternalModificationContext& context) const {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

Expand All @@ -172,10 +174,10 @@ NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalD
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
FillCreateExternalDataSourceCommand(schemeTx, settings, context);

return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem(), true);
return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem(), schemeTx.GetFailedOnAlreadyExists(), schemeTx.GetSuccessOnNotExist());
}

NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalDataSourceManager::DropExternalDataSource(const NYql::TObjectSettingsImpl& settings,
NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalDataSourceManager::DropExternalDataSource(const NYql::TDropObjectSettings& settings,
TInternalModificationContext& context) const {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

Expand All @@ -188,7 +190,7 @@ NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalD
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
FillDropExternalDataSourceCommand(schemeTx, settings, context);

return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem());
return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem(), schemeTx.GetFailedOnAlreadyExists(), schemeTx.GetSuccessOnNotExist());
}

TExternalDataSourceManager::TYqlConclusionStatus TExternalDataSourceManager::DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
Expand Down Expand Up @@ -216,12 +218,12 @@ TExternalDataSourceManager::TYqlConclusionStatus TExternalDataSourceManager::DoP
}
}

void TExternalDataSourceManager::PrepareCreateExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
void TExternalDataSourceManager::PrepareCreateExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TCreateObjectSettings& settings,
TInternalModificationContext& context) const {
FillCreateExternalDataSourceCommand(*schemeOperation.MutableCreateExternalDataSource(), settings, context);
}

void TExternalDataSourceManager::PrepareDropExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
void TExternalDataSourceManager::PrepareDropExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings,
TInternalModificationContext& context) const {
FillDropExternalDataSourceCommand(*schemeOperation.MutableDropExternalDataSource(), settings, context);
}
Expand Down Expand Up @@ -252,7 +254,7 @@ NThreading::TFuture<NMetadata::NModifications::IOperationsManager::TYqlConclusio
TStringBuilder() << "Execution of prepare operation for EXTERNAL_DATA_SOURCE object: unsupported operation: " << int(schemeOperation.GetOperationCase())));
}

return SendSchemeRequest(ev.Release(), context.GetActorSystem(), true);
return SendSchemeRequest(ev.Release(), context.GetActorSystem(), schemeTx.GetFailedOnAlreadyExists(), schemeTx.GetSuccessOnNotExist());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
namespace NKikimr::NKqp {

class TExternalDataSourceManager: public NMetadata::NModifications::IOperationsManager {
NThreading::TFuture<TYqlConclusionStatus> CreateExternalDataSource(const NYql::TObjectSettingsImpl& settings,
NThreading::TFuture<TYqlConclusionStatus> CreateExternalDataSource(const NYql::TCreateObjectSettings& settings,
TInternalModificationContext& context) const;

NThreading::TFuture<TYqlConclusionStatus> DropExternalDataSource(const NYql::TObjectSettingsImpl& settings,
NThreading::TFuture<TYqlConclusionStatus> DropExternalDataSource(const NYql::TDropObjectSettings& settings,
TInternalModificationContext& context) const;

void PrepareCreateExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
void PrepareCreateExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TCreateObjectSettings& settings,
TInternalModificationContext& context) const;

void PrepareDropExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
void PrepareDropExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings,
TInternalModificationContext& context) const;

protected:
Expand Down
11 changes: 7 additions & 4 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ class TKikimrIcGateway : public IKqpGateway {

TFuture<TGenericResult> CreateExternalTable(const TString& cluster,
const NYql::TCreateExternalTableSettings& settings,
bool createDir) override {
bool createDir, bool existingOk) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

try {
Expand All @@ -1211,6 +1211,7 @@ class TKikimrIcGateway : public IKqpGateway {
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
schemeTx.SetWorkingDir(pathPair.first);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalTable);
schemeTx.SetFailedOnAlreadyExists(!existingOk);

NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
Expand All @@ -1228,7 +1229,8 @@ class TKikimrIcGateway : public IKqpGateway {
}

TFuture<TGenericResult> DropExternalTable(const TString& cluster,
const NYql::TDropExternalTableSettings& settings) override {
const NYql::TDropExternalTableSettings& settings,
bool missingOk) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

try {
Expand All @@ -1253,6 +1255,7 @@ class TKikimrIcGateway : public IKqpGateway {
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
schemeTx.SetWorkingDir(pathPair.first);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalTable);
schemeTx.SetSuccessOnNotExist(missingOk);

NKikimrSchemeOp::TDrop& drop = *schemeTx.MutableDrop();
drop.SetName(pathPair.second);
Expand Down Expand Up @@ -1489,7 +1492,7 @@ class TKikimrIcGateway : public IKqpGateway {
auto& dropUser = *schemeTx.MutableAlterLogin()->MutableRemoveUser();

dropUser.SetUser(settings.UserName);
dropUser.SetMissingOk(settings.Force);
dropUser.SetMissingOk(settings.MissingOk);

SendSchemeRequest(ev.Release()).Apply(
[dropUserPromise](const TFuture<TGenericResult>& future) mutable {
Expand Down Expand Up @@ -1840,7 +1843,7 @@ class TKikimrIcGateway : public IKqpGateway {
auto& dropGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroup();

dropGroup.SetGroup(settings.GroupName);
dropGroup.SetMissingOk(settings.Force);
dropGroup.SetMissingOk(settings.MissingOk);

SendSchemeRequest(ev.Release()).Apply(
[dropGroupPromise](const TFuture<TGenericResult>& future) mutable {
Expand Down
15 changes: 9 additions & 6 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
auto& dropUser = *schemeTx.MutableAlterLogin()->MutableRemoveUser();
dropUser.SetUser(settings.UserName);
dropUser.SetMissingOk(settings.Force);
dropUser.SetMissingOk(settings.MissingOk);

auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
Expand Down Expand Up @@ -1195,7 +1195,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterLogin);
auto& dropGroup = *schemeTx.MutableAlterLogin()->MutableRemoveGroup();
dropGroup.SetGroup(settings.GroupName);
dropGroup.SetMissingOk(settings.Force);
dropGroup.SetMissingOk(settings.MissingOk);

auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
Expand Down Expand Up @@ -1240,7 +1240,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
}

TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings,
bool createDir) override
bool createDir, bool existingOk) override
{
CHECK_PREPARED_DDL(CreateExternalTable);

Expand All @@ -1264,6 +1264,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableCreateExternalTable();
schemeTx.SetWorkingDir(pathPair.first);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalTable);
schemeTx.SetFailedOnAlreadyExists(!existingOk);

NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
Expand All @@ -1272,7 +1273,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
phyTxRemover.Forget();
return MakeFuture(result);
} else {
return Gateway->CreateExternalTable(cluster, settings, createDir);
return Gateway->CreateExternalTable(cluster, settings, createDir, existingOk);
}
}

Expand All @@ -1283,7 +1284,8 @@ class TKqpGatewayProxy : public IKikimrGateway {
}

TFuture<TGenericResult> DropExternalTable(const TString& cluster,
const TDropExternalTableSettings& settings) override
const TDropExternalTableSettings& settings,
bool missingOk) override
{
CHECK_PREPARED_DDL(DropExternalTable);

Expand All @@ -1307,6 +1309,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableDropExternalTable();
schemeTx.SetWorkingDir(pathPair.first);
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalTable);
schemeTx.SetSuccessOnNotExist(missingOk);

NKikimrSchemeOp::TDrop& drop = *schemeTx.MutableDrop();
drop.SetName(pathPair.second);
Expand All @@ -1316,7 +1319,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
phyTxRemover.Forget();
return MakeFuture(result);
} else {
return Gateway->DropExternalTable(cluster, settings);
return Gateway->DropExternalTable(cluster, settings, missingOk);
}
}

Expand Down
20 changes: 16 additions & 4 deletions ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -856,13 +856,16 @@ class TKikimrDataSink : public TDataProviderBase
.Features(settings.Features)
.Done()
.Ptr();
} else if (mode == "createObject") {
} else if (mode == "createObject" || mode == "createObjectIfNotExists") {
return Build<TKiCreateObject>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.ObjectId().Build(key.GetObjectId())
.TypeId().Build(key.GetObjectType())
.Features(settings.Features)
.ExistingOk<TCoAtom>()
.Value(mode == "createObjectIfNotExists")
.Build()
.Done()
.Ptr();
} else if (mode == "alterObject") {
Expand All @@ -874,13 +877,16 @@ class TKikimrDataSink : public TDataProviderBase
.Features(settings.Features)
.Done()
.Ptr();
} else if (mode == "dropObject") {
} else if (mode == "dropObject" || mode == "dropObjectIfExists") {
return Build<TKiDropObject>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.ObjectId().Build(key.GetObjectId())
.TypeId().Build(key.GetObjectType())
.Features(settings.Features)
.MissingOk<TCoAtom>()
.Value(mode == "dropObjectIfExists")
.Build()
.Done()
.Ptr();
} else {
Expand Down Expand Up @@ -910,12 +916,15 @@ class TKikimrDataSink : public TDataProviderBase
.Settings(settings.Other)
.Done()
.Ptr();
} else if (mode == "dropUser") {
} else if (mode == "dropUser" || mode == "dropUserIfExists") {
return Build<TKiDropUser>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.UserName().Build(key.GetRoleName())
.Settings(settings.Other)
.MissingOk<TCoAtom>()
.Value(mode == "dropUserIfExists")
.Build()
.Done()
.Ptr();
} else if (mode == "createGroup") {
Expand Down Expand Up @@ -943,12 +952,15 @@ class TKikimrDataSink : public TDataProviderBase
.NewName(settings.NewName.Cast())
.Done()
.Ptr();
} else if (mode == "dropGroup") {
} else if (mode == "dropGroup" || mode == "dropGroupIfExists") {
return Build<TKiDropGroup>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.GroupName().Build(key.GetRoleName())
.Settings(settings.Other)
.MissingOk<TCoAtom>()
.Value(mode == "dropGroupIfExists")
.Build()
.Done()
.Ptr();
} else {
Expand Down
Loading