Skip to content

Commit b7b517a

Browse files
committed
Initial version
1 parent 4eceb69 commit b7b517a

19 files changed

+159
-47
lines changed

ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri
3737
externaDataSourceDesc.SetSourceType(GetOrEmpty(settings, "source_type"));
3838
externaDataSourceDesc.SetLocation(GetOrEmpty(settings, "location"));
3939
externaDataSourceDesc.SetInstallation(GetOrEmpty(settings, "installation"));
40+
externaDataSourceDesc.SetIsReplace(settings.GetIsReplace());
4041

4142
TString authMethod = GetOrEmpty(settings, "auth_method");
4243
if (authMethod == "NONE") {

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -855,10 +855,11 @@ class TKikimrIcGateway : public IKqpGateway {
855855
return profilesPromise.GetFuture();
856856
}
857857

858-
TFuture<TGenericResult> CreateTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk) override {
858+
TFuture<TGenericResult> CreateTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk, bool isReplace) override {
859859
Y_UNUSED(metadata);
860860
Y_UNUSED(createDir);
861861
Y_UNUSED(existingOk);
862+
Y_UNUSED(isReplace);
862863
return NotImplemented<TGenericResult>();
863864
}
864865

@@ -1187,7 +1188,7 @@ class TKikimrIcGateway : public IKqpGateway {
11871188

11881189
TFuture<TGenericResult> CreateExternalTable(const TString& cluster,
11891190
const NYql::TCreateExternalTableSettings& settings,
1190-
bool createDir, bool existingOk) override {
1191+
bool createDir, bool existingOk, bool isReplace) override {
11911192
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
11921193

11931194
try {
@@ -1214,7 +1215,7 @@ class TKikimrIcGateway : public IKqpGateway {
12141215
schemeTx.SetFailedOnAlreadyExists(!existingOk);
12151216

12161217
NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
1217-
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
1218+
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, isReplace, settings);
12181219
return SendSchemeRequest(ev.Release(), true);
12191220
}
12201221
catch (yexception& e) {

ydb/core/kqp/gateway/utils/scheme_helpers.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@ bool SetDatabaseForLoginOperation(TString& result, bool getDomainLoginOnly, TMay
6161

6262
void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc,
6363
const TString& name,
64+
bool isReplace,
6465
const NYql::TCreateExternalTableSettings& settings)
6566
{
6667
externalTableDesc.SetName(name);
6768
externalTableDesc.SetDataSourcePath(settings.DataSourcePath);
6869
externalTableDesc.SetLocation(settings.Location);
6970
externalTableDesc.SetSourceType("General");
71+
externalTableDesc.SetIsReplace(isReplace);
7072

7173
Y_ENSURE(settings.ColumnOrder.size() == settings.Columns.size());
7274
for (const auto& name : settings.ColumnOrder) {

ydb/core/kqp/gateway/utils/scheme_helpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ bool SetDatabaseForLoginOperation(TString& result, bool getDomainLoginOnly, TMay
2929

3030
void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc,
3131
const TString& name,
32+
bool isReplace,
3233
const NYql::TCreateExternalTableSettings& settings);
3334

3435
std::pair<TString, TString> SplitPathByDirAndBaseNames(const TString& path);

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,8 @@ class TKqpGatewayProxy : public IKikimrGateway {
444444
return Gateway->LoadTableMetadata(cluster, table, settings);
445445
}
446446

447-
TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk) override {
447+
TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk, bool isReplace) override {
448+
Y_UNUSED(isReplace);
448449
CHECK_PREPARED_DDL(CreateTable);
449450

450451
std::pair<TString, TString> pathPair;
@@ -1243,7 +1244,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
12431244
}
12441245

12451246
TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings,
1246-
bool createDir, bool existingOk) override
1247+
bool createDir, bool existingOk, bool isReplace) override
12471248
{
12481249
CHECK_PREPARED_DDL(CreateExternalTable);
12491250

@@ -1270,13 +1271,13 @@ class TKqpGatewayProxy : public IKikimrGateway {
12701271
schemeTx.SetFailedOnAlreadyExists(!existingOk);
12711272

12721273
NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
1273-
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
1274+
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, isReplace, settings);
12741275
TGenericResult result;
12751276
result.SetSuccess();
12761277
phyTxRemover.Forget();
12771278
return MakeFuture(result);
12781279
} else {
1279-
return Gateway->CreateExternalTable(cluster, settings, createDir, existingOk);
1280+
return Gateway->CreateExternalTable(cluster, settings, createDir, existingOk, isReplace);
12801281
}
12811282
}
12821283

ydb/core/kqp/provider/yql_kikimr_datasink.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
261261
? GetTableTypeFromString(settings.TableType.Cast())
262262
: ETableType::Table; // v0 support
263263

264-
if (mode == "create" || mode == "create_if_not_exists") {
264+
if (mode == "create" || mode == "create_if_not_exists" || mode == "create_or_replace") {
265265
if (!settings.Columns) {
266266
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
267267
<< "No columns provided for create mode."));
@@ -757,7 +757,7 @@ class TKikimrDataSink : public TDataProviderBase
757757
? settings.TableType.Cast()
758758
: Build<TCoAtom>(ctx, node->Pos()).Value("table").Done(); // v0 support
759759
auto mode = settings.Mode.Cast();
760-
if (mode == "create" || mode == "create_if_not_exists") {
760+
if (mode == "create" || mode == "create_if_not_exists" || mode == "create_or_replace") {
761761
YQL_ENSURE(settings.Columns);
762762
YQL_ENSURE(!settings.Columns.Cast().Empty());
763763

@@ -779,6 +779,7 @@ class TKikimrDataSink : public TDataProviderBase
779779
? settings.Temporary.Cast()
780780
: Build<TCoAtom>(ctx, node->Pos()).Value("false").Done();
781781

782+
auto isReplace = (settings.Mode.Cast().Value() == "create_or_replace");
782783
auto existringOk = (settings.Mode.Cast().Value() == "create_if_not_exists");
783784

784785
return Build<TKiCreateTable>(ctx, node->Pos())
@@ -795,9 +796,12 @@ class TKikimrDataSink : public TDataProviderBase
795796
.ColumnFamilies(settings.ColumnFamilies.Cast())
796797
.TableSettings(settings.TableSettings.Cast())
797798
.TableType(tableType)
799+
.IsReplace<TCoAtom>()
800+
.Value(isReplace)
801+
.Build()
798802
.ExistingOk<TCoAtom>()
799803
.Value(existringOk)
800-
.Build()
804+
.Build()
801805
.Done()
802806
.Ptr();
803807
} else if (mode == "alter") {
@@ -886,16 +890,19 @@ class TKikimrDataSink : public TDataProviderBase
886890
.Features(settings.Features)
887891
.Done()
888892
.Ptr();
889-
} else if (mode == "createObject" || mode == "createObjectIfNotExists") {
893+
} else if (mode == "createObject" || mode == "createObjectIfNotExists" || mode == "createObjectOrReplace") {
890894
return Build<TKiCreateObject>(ctx, node->Pos())
891895
.World(node->Child(0))
892896
.DataSink(node->Child(1))
893897
.ObjectId().Build(key.GetObjectId())
894898
.TypeId().Build(key.GetObjectType())
895899
.Features(settings.Features)
900+
.IsReplace<TCoAtom>()
901+
.Value(mode == "createObjectOrReplace")
902+
.Build()
896903
.ExistingOk<TCoAtom>()
897904
.Value(mode == "createObjectIfNotExists")
898-
.Build()
905+
.Build()
899906
.Done()
900907
.Ptr();
901908
} else if (mode == "alterObject") {

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -934,10 +934,11 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
934934
NThreading::TFuture<IKikimrGateway::TGenericResult> future;
935935
bool isColumn = (table.Metadata->StoreType == EStoreType::Column);
936936
bool existingOk = (maybeCreate.ExistingOk().Cast().Value() == "1");
937+
bool isReplace = (maybeCreate.IsReplace().Cast().Value() == "1");
937938
switch (tableTypeItem) {
938939
case ETableType::ExternalTable: {
939940
future = Gateway->CreateExternalTable(cluster,
940-
ParseCreateExternalTableSettings(maybeCreate.Cast(), table.Metadata->TableSettings), true, existingOk);
941+
ParseCreateExternalTableSettings(maybeCreate.Cast(), table.Metadata->TableSettings), true, existingOk, isReplace);
941942
break;
942943
}
943944
case ETableType::TableStore: {

ydb/core/kqp/provider/yql_kikimr_expr_nodes.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@
131131
{"Index": 10, "Name": "Changefeeds", "Type": "TCoChangefeedList"},
132132
{"Index": 11, "Name": "TableType", "Type": "TCoAtom"},
133133
{"Index": 12, "Name": "Temporary", "Type": "TCoAtom"},
134-
{"Index": 13, "Name": "ExistingOk", "Type": "TCoAtom"}
134+
{"Index": 13, "Name": "ExistingOk", "Type": "TCoAtom"},
135+
{"Index": 14, "Name": "IsReplace", "Type": "TCoAtom"}
135136
]
136137
},
137138
{
@@ -254,7 +255,8 @@
254255
{"Index": 2, "Name": "ObjectId", "Type": "TCoAtom"},
255256
{"Index": 3, "Name": "TypeId", "Type": "TCoAtom"},
256257
{"Index": 4, "Name": "Features", "Type": "TCoNameValueTupleList"},
257-
{"Index": 5, "Name": "ExistingOk", "Type": "TCoAtom"}
258+
{"Index": 5, "Name": "ExistingOk", "Type": "TCoAtom"},
259+
{"Index": 6, "Name": "IsReplace", "Type": "TCoAtom"}
258260
]
259261
},
260262
{

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,7 @@ class IKikimrGateway : public TThrRefBase {
790790
virtual NThreading::TFuture<TTableMetadataResult> LoadTableMetadata(
791791
const TString& cluster, const TString& table, TLoadTableMetadataSettings settings) = 0;
792792

793-
virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0;
793+
virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false, bool isReplace = false) = 0;
794794

795795
virtual NThreading::TFuture<TGenericResult> SendSchemeExecuterRequest(const TString& cluster,
796796
const TMaybe<TString>& requestType,
@@ -843,7 +843,7 @@ class IKikimrGateway : public TThrRefBase {
843843

844844
virtual NThreading::TFuture<TGenericResult> DropTableStore(const TString& cluster, const TDropTableStoreSettings& settings) = 0;
845845

846-
virtual NThreading::TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, bool createDir, bool existingOk) = 0;
846+
virtual NThreading::TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, bool createDir, bool existingOk, bool isReplace) = 0;
847847

848848
virtual NThreading::TFuture<TGenericResult> AlterExternalTable(const TString& cluster, const TAlterExternalTableSettings& settings) = 0;
849849

ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ void TestCreateExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGa
170170
settings.Columns.insert(std::make_pair("Column2", TKikimrColumnMetadata{"Column2", 0, "String", false}));
171171
settings.ColumnOrder.push_back("Column2");
172172

173-
auto responseFuture = gateway->CreateExternalTable(TestCluster, settings, true, false);
173+
auto responseFuture = gateway->CreateExternalTable(TestCluster, settings, true, false, false);
174174
responseFuture.Wait();
175175
auto response = responseFuture.GetValue();
176176
response.Issues().PrintTo(Cerr);

ydb/core/protos/flat_scheme_op.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1806,6 +1806,7 @@ message TExternalTableDescription {
18061806
optional string Location = 6;
18071807
repeated TColumnDescription Columns = 7;
18081808
optional bytes Content = 8;
1809+
optional bool IsReplace = 9;
18091810
}
18101811

18111812
// Access without authorization
@@ -1866,6 +1867,7 @@ message TExternalDataSourceDescription {
18661867
optional string Installation = 6;
18671868
optional TAuth Auth = 7;
18681869
optional TExternalDataSourceProperties Properties = 8;
1870+
optional bool IsReplace = 9;
18691871
}
18701872

18711873
message TViewDescription {

ydb/library/yql/sql/v1/SQLv1.g.in

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ values_source_row: LPAREN expr_list RPAREN;
570570

571571
simple_values_source: expr_list | select_stmt;
572572

573-
create_external_data_source_stmt: CREATE EXTERNAL DATA SOURCE (IF NOT EXISTS)? object_ref
573+
create_external_data_source_stmt: CREATE EXTERNAL DATA SOURCE (IF NOT EXISTS | OR REPLACE)? object_ref
574574
with_table_settings
575575
;
576576

@@ -623,7 +623,7 @@ object_features: object_feature | LPAREN object_feature (COMMA object_feature)*
623623

624624
object_type_ref: an_id_or_type;
625625

626-
create_table_stmt: CREATE (TABLE | TABLESTORE | EXTERNAL TABLE) (IF NOT EXISTS)? simple_table_ref LPAREN create_table_entry (COMMA create_table_entry)* COMMA? RPAREN
626+
create_table_stmt: CREATE (TABLE | TABLESTORE | EXTERNAL TABLE) (IF NOT EXISTS | OR REPLACE)? simple_table_ref LPAREN create_table_entry (COMMA create_table_entry)* COMMA? RPAREN
627627
table_inherits?
628628
table_partition_by?
629629
with_table_settings?

ydb/library/yql/sql/v1/node.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,7 @@ namespace NSQLTranslationV1 {
12311231
TNodePtr BuildUpsertObjectOperation(TPosition pos, const TString& objectId, const TString& typeId,
12321232
std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
12331233
TNodePtr BuildCreateObjectOperation(TPosition pos, const TString& objectId, const TString& typeId,
1234-
bool existingOk, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
1234+
bool existingOk, bool isReplace, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
12351235
TNodePtr BuildAlterObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,
12361236
std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context);
12371237
TNodePtr BuildDropObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,

ydb/library/yql/sql/v1/object_processing.h

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,30 @@ class TCreateObject: public TObjectProcessorImpl {
4040
std::set<TString> FeaturesToReset;
4141
protected:
4242
bool ExistingOk = false;
43+
bool IsReplace = false;
4344
protected:
4445
virtual INode::TPtr BuildOptions() const override {
45-
return Y(Q(Y(Q("mode"), Q(ExistingOk ? "createObjectIfNotExists" : "createObject"))));
46+
TString mode;
47+
if (ExistingOk) {
48+
mode = "createObjectIfNotExists";
49+
} else if (IsReplace) {
50+
mode = "createObjectOrReplace";
51+
} else {
52+
mode = "createObject";
53+
}
54+
55+
return Y(Q(Y(Q("mode"), Q(mode))));
4656
}
4757
virtual INode::TPtr FillFeatures(INode::TPtr options) const override;
4858
public:
4959
TCreateObject(TPosition pos, const TString& objectId,
50-
const TString& typeId, bool existingOk, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context)
60+
const TString& typeId, bool existingOk, bool isReplace, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context)
5161
: TBase(pos, objectId, typeId, context)
5262
, Features(std::move(features))
5363
, FeaturesToReset(std::move(featuresToReset))
54-
, ExistingOk(existingOk) {
55-
}
64+
, ExistingOk(existingOk)
65+
, IsReplace(isReplace) {
66+
}
5667
};
5768

5869
class TUpsertObject final: public TCreateObject {

0 commit comments

Comments
 (0)