Skip to content

Commit 479ab97

Browse files
authored
[YQ-1997] OR REPLACE grammar support for EXTERNAL DATA SOURCE and EXTERNAL TABLE (#1318)
1 parent 2d0ace6 commit 479ab97

21 files changed

+218
-76
lines changed

ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri
3737
externaDataSourceDesc.SetSourceType(GetOrEmpty(settings, "source_type"));
3838
externaDataSourceDesc.SetLocation(GetOrEmpty(settings, "location"));
3939
externaDataSourceDesc.SetInstallation(GetOrEmpty(settings, "installation"));
40+
externaDataSourceDesc.SetReplaceIfExists(settings.GetReplaceIfExists());
4041

4142
TString authMethod = GetOrEmpty(settings, "auth_method");
4243
if (authMethod == "NONE") {

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -850,10 +850,11 @@ class TKikimrIcGateway : public IKqpGateway {
850850
return profilesPromise.GetFuture();
851851
}
852852

853-
TFuture<TGenericResult> CreateTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk) override {
853+
TFuture<TGenericResult> CreateTable(NYql::TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk, bool replaceIfExists) override {
854854
Y_UNUSED(metadata);
855855
Y_UNUSED(createDir);
856856
Y_UNUSED(existingOk);
857+
Y_UNUSED(replaceIfExists);
857858
return NotImplemented<TGenericResult>();
858859
}
859860

@@ -1182,7 +1183,7 @@ class TKikimrIcGateway : public IKqpGateway {
11821183

11831184
TFuture<TGenericResult> CreateExternalTable(const TString& cluster,
11841185
const NYql::TCreateExternalTableSettings& settings,
1185-
bool createDir, bool existingOk) override {
1186+
bool createDir, bool existingOk, bool replaceIfExists) override {
11861187
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
11871188

11881189
try {
@@ -1209,7 +1210,7 @@ class TKikimrIcGateway : public IKqpGateway {
12091210
schemeTx.SetFailedOnAlreadyExists(!existingOk);
12101211

12111212
NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
1212-
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
1213+
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, replaceIfExists, settings);
12131214
return SendSchemeRequest(ev.Release(), true);
12141215
}
12151216
catch (yexception& e) {

ydb/core/kqp/gateway/utils/scheme_helpers.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@ bool SetDatabaseForLoginOperation(TString& result, bool getDomainLoginOnly, TMay
6161

6262
void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc,
6363
const TString& name,
64+
bool replaceIfExists,
6465
const NYql::TCreateExternalTableSettings& settings)
6566
{
6667
externalTableDesc.SetName(name);
6768
externalTableDesc.SetDataSourcePath(settings.DataSourcePath);
6869
externalTableDesc.SetLocation(settings.Location);
6970
externalTableDesc.SetSourceType("General");
71+
externalTableDesc.SetReplaceIfExists(replaceIfExists);
7072

7173
Y_ENSURE(settings.ColumnOrder.size() == settings.Columns.size());
7274
for (const auto& name : settings.ColumnOrder) {

ydb/core/kqp/gateway/utils/scheme_helpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ bool SetDatabaseForLoginOperation(TString& result, bool getDomainLoginOnly, TMay
2929

3030
void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc,
3131
const TString& name,
32+
bool replaceIfExists,
3233
const NYql::TCreateExternalTableSettings& settings);
3334

3435
std::pair<TString, TString> SplitPathByDirAndBaseNames(const TString& path);

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,8 @@ class TKqpGatewayProxy : public IKikimrGateway {
444444
return Gateway->LoadTableMetadata(cluster, table, settings);
445445
}
446446

447-
TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk) override {
447+
TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk, bool replaceIfExists) override {
448+
Y_UNUSED(replaceIfExists);
448449
CHECK_PREPARED_DDL(CreateTable);
449450

450451
std::pair<TString, TString> pathPair;
@@ -1243,7 +1244,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
12431244
}
12441245

12451246
TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings,
1246-
bool createDir, bool existingOk) override
1247+
bool createDir, bool existingOk, bool replaceIfExists) override
12471248
{
12481249
CHECK_PREPARED_DDL(CreateExternalTable);
12491250

@@ -1270,13 +1271,13 @@ class TKqpGatewayProxy : public IKikimrGateway {
12701271
schemeTx.SetFailedOnAlreadyExists(!existingOk);
12711272

12721273
NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable();
1273-
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings);
1274+
NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, replaceIfExists, settings);
12741275
TGenericResult result;
12751276
result.SetSuccess();
12761277
phyTxRemover.Forget();
12771278
return MakeFuture(result);
12781279
} else {
1279-
return Gateway->CreateExternalTable(cluster, settings, createDir, existingOk);
1280+
return Gateway->CreateExternalTable(cluster, settings, createDir, existingOk, replaceIfExists);
12801281
}
12811282
}
12821283

ydb/core/kqp/provider/yql_kikimr_datasink.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
261261
? GetTableTypeFromString(settings.TableType.Cast())
262262
: ETableType::Table; // v0 support
263263

264-
if (mode == "create" || mode == "create_if_not_exists") {
264+
if (mode == "create" || mode == "create_if_not_exists" || mode == "create_or_replace") {
265265
if (!settings.Columns) {
266266
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
267267
<< "No columns provided for create mode."));
@@ -762,7 +762,7 @@ class TKikimrDataSink : public TDataProviderBase
762762
? settings.TableType.Cast()
763763
: Build<TCoAtom>(ctx, node->Pos()).Value("table").Done(); // v0 support
764764
auto mode = settings.Mode.Cast();
765-
if (mode == "create" || mode == "create_if_not_exists") {
765+
if (mode == "create" || mode == "create_if_not_exists" || mode == "create_or_replace") {
766766
YQL_ENSURE(settings.Columns);
767767
YQL_ENSURE(!settings.Columns.Cast().Empty());
768768

@@ -784,6 +784,7 @@ class TKikimrDataSink : public TDataProviderBase
784784
? settings.Temporary.Cast()
785785
: Build<TCoAtom>(ctx, node->Pos()).Value("false").Done();
786786

787+
auto replaceIfExists = (settings.Mode.Cast().Value() == "create_or_replace");
787788
auto existringOk = (settings.Mode.Cast().Value() == "create_if_not_exists");
788789

789790
return Build<TKiCreateTable>(ctx, node->Pos())
@@ -800,9 +801,12 @@ class TKikimrDataSink : public TDataProviderBase
800801
.ColumnFamilies(settings.ColumnFamilies.Cast())
801802
.TableSettings(settings.TableSettings.Cast())
802803
.TableType(tableType)
804+
.ReplaceIfExists<TCoAtom>()
805+
.Value(replaceIfExists)
806+
.Build()
803807
.ExistingOk<TCoAtom>()
804808
.Value(existringOk)
805-
.Build()
809+
.Build()
806810
.Done()
807811
.Ptr();
808812
} else if (mode == "alter") {
@@ -891,16 +895,19 @@ class TKikimrDataSink : public TDataProviderBase
891895
.Features(settings.Features)
892896
.Done()
893897
.Ptr();
894-
} else if (mode == "createObject" || mode == "createObjectIfNotExists") {
898+
} else if (mode == "createObject" || mode == "createObjectIfNotExists" || mode == "createObjectOrReplace") {
895899
return Build<TKiCreateObject>(ctx, node->Pos())
896900
.World(node->Child(0))
897901
.DataSink(node->Child(1))
898902
.ObjectId().Build(key.GetObjectId())
899903
.TypeId().Build(key.GetObjectType())
900904
.Features(settings.Features)
905+
.ReplaceIfExists<TCoAtom>()
906+
.Value(mode == "createObjectOrReplace")
907+
.Build()
901908
.ExistingOk<TCoAtom>()
902909
.Value(mode == "createObjectIfNotExists")
903-
.Build()
910+
.Build()
904911
.Done()
905912
.Ptr();
906913
} else if (mode == "alterObject") {

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -934,10 +934,11 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
934934
NThreading::TFuture<IKikimrGateway::TGenericResult> future;
935935
bool isColumn = (table.Metadata->StoreType == EStoreType::Column);
936936
bool existingOk = (maybeCreate.ExistingOk().Cast().Value() == "1");
937+
bool replaceIfExists = (maybeCreate.ReplaceIfExists().Cast().Value() == "1");
937938
switch (tableTypeItem) {
938939
case ETableType::ExternalTable: {
939940
future = Gateway->CreateExternalTable(cluster,
940-
ParseCreateExternalTableSettings(maybeCreate.Cast(), table.Metadata->TableSettings), true, existingOk);
941+
ParseCreateExternalTableSettings(maybeCreate.Cast(), table.Metadata->TableSettings), true, existingOk, replaceIfExists);
941942
break;
942943
}
943944
case ETableType::TableStore: {

ydb/core/kqp/provider/yql_kikimr_expr_nodes.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@
131131
{"Index": 10, "Name": "Changefeeds", "Type": "TCoChangefeedList"},
132132
{"Index": 11, "Name": "TableType", "Type": "TCoAtom"},
133133
{"Index": 12, "Name": "Temporary", "Type": "TCoAtom"},
134-
{"Index": 13, "Name": "ExistingOk", "Type": "TCoAtom"}
134+
{"Index": 13, "Name": "ExistingOk", "Type": "TCoAtom"},
135+
{"Index": 14, "Name": "ReplaceIfExists", "Type": "TCoAtom"}
135136
]
136137
},
137138
{
@@ -254,7 +255,8 @@
254255
{"Index": 2, "Name": "ObjectId", "Type": "TCoAtom"},
255256
{"Index": 3, "Name": "TypeId", "Type": "TCoAtom"},
256257
{"Index": 4, "Name": "Features", "Type": "TCoNameValueTupleList"},
257-
{"Index": 5, "Name": "ExistingOk", "Type": "TCoAtom"}
258+
{"Index": 5, "Name": "ExistingOk", "Type": "TCoAtom"},
259+
{"Index": 6, "Name": "ReplaceIfExists", "Type": "TCoAtom"}
258260
]
259261
},
260262
{

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,7 @@ class IKikimrGateway : public TThrRefBase {
790790
virtual NThreading::TFuture<TTableMetadataResult> LoadTableMetadata(
791791
const TString& cluster, const TString& table, TLoadTableMetadataSettings settings) = 0;
792792

793-
virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0;
793+
virtual NThreading::TFuture<TGenericResult> CreateTable(TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false, bool replaceIfExists = false) = 0;
794794

795795
virtual NThreading::TFuture<TGenericResult> SendSchemeExecuterRequest(const TString& cluster,
796796
const TMaybe<TString>& requestType,
@@ -843,7 +843,7 @@ class IKikimrGateway : public TThrRefBase {
843843

844844
virtual NThreading::TFuture<TGenericResult> DropTableStore(const TString& cluster, const TDropTableStoreSettings& settings) = 0;
845845

846-
virtual NThreading::TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, bool createDir, bool existingOk) = 0;
846+
virtual NThreading::TFuture<TGenericResult> CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, bool createDir, bool existingOk, bool replaceIfExists) = 0;
847847

848848
virtual NThreading::TFuture<TGenericResult> AlterExternalTable(const TString& cluster, const TAlterExternalTableSettings& settings) = 0;
849849

ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ void TestCreateExternalTable(TTestActorRuntime& runtime, TIntrusivePtr<IKikimrGa
170170
settings.Columns.insert(std::make_pair("Column2", TKikimrColumnMetadata{"Column2", 0, "String", false}));
171171
settings.ColumnOrder.push_back("Column2");
172172

173-
auto responseFuture = gateway->CreateExternalTable(TestCluster, settings, true, false);
173+
auto responseFuture = gateway->CreateExternalTable(TestCluster, settings, true, false, false);
174174
responseFuture.Wait();
175175
auto response = responseFuture.GetValue();
176176
response.Issues().PrintTo(Cerr);

ydb/core/protos/flat_scheme_op.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1806,6 +1806,7 @@ message TExternalTableDescription {
18061806
optional string Location = 6;
18071807
repeated TColumnDescription Columns = 7;
18081808
optional bytes Content = 8;
1809+
optional bool ReplaceIfExists = 9; // Only applicable for `create external table` operation
18091810
}
18101811

18111812
// Access without authorization
@@ -1866,6 +1867,7 @@ message TExternalDataSourceDescription {
18661867
optional string Installation = 6;
18671868
optional TAuth Auth = 7;
18681869
optional TExternalDataSourceProperties Properties = 8;
1870+
optional bool ReplaceIfExists = 9;
18691871
}
18701872

18711873
message TViewDescription {

ydb/library/yql/sql/v1/SQLv1.g.in

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ values_source_row: LPAREN expr_list RPAREN;
570570

571571
simple_values_source: expr_list | select_stmt;
572572

573-
create_external_data_source_stmt: CREATE EXTERNAL DATA SOURCE (IF NOT EXISTS)? object_ref
573+
create_external_data_source_stmt: CREATE (OR REPLACE)? EXTERNAL DATA SOURCE (IF NOT EXISTS)? object_ref
574574
with_table_settings
575575
;
576576

@@ -623,7 +623,7 @@ object_features: object_feature | LPAREN object_feature (COMMA object_feature)*
623623

624624
object_type_ref: an_id_or_type;
625625

626-
create_table_stmt: CREATE (TABLE | TABLESTORE | EXTERNAL TABLE) (IF NOT EXISTS)? simple_table_ref LPAREN create_table_entry (COMMA create_table_entry)* COMMA? RPAREN
626+
create_table_stmt: CREATE (OR REPLACE)? (TABLE | TABLESTORE | EXTERNAL TABLE) (IF NOT EXISTS)? simple_table_ref LPAREN create_table_entry (COMMA create_table_entry)* COMMA? RPAREN
627627
table_inherits?
628628
table_partition_by?
629629
with_table_settings?

ydb/library/yql/sql/v1/format/sql_format.cpp

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -891,27 +891,24 @@ friend struct TStaticData;
891891
Visit(msg.GetToken1());
892892
Visit(msg.GetBlock2());
893893
Visit(msg.GetBlock3());
894-
Visit(msg.GetRule_simple_table_ref4());
895-
Visit(msg.GetToken5());
894+
Visit(msg.GetBlock4());
895+
Visit(msg.GetRule_simple_table_ref5());
896+
Visit(msg.GetToken6());
896897
PushCurrentIndent();
897898
NewLine();
898-
Visit(msg.GetRule_create_table_entry6());
899-
for (const auto& b : msg.GetBlock7()) {
899+
Visit(msg.GetRule_create_table_entry7());
900+
for (const auto& b : msg.GetBlock8()) {
900901
Visit(b.GetToken1());
901902
NewLine();
902903
Visit(b.GetRule_create_table_entry2());
903904
}
904-
if (msg.HasBlock8()) {
905-
Visit(msg.GetBlock8());
905+
if (msg.HasBlock9()) {
906+
Visit(msg.GetBlock9());
906907
}
907908

908909
PopCurrentIndent();
909910
NewLine();
910-
Visit(msg.GetToken9());
911-
if (msg.HasBlock10()) {
912-
NewLine();
913-
Visit(msg.GetBlock10());
914-
}
911+
Visit(msg.GetToken10());
915912
if (msg.HasBlock11()) {
916913
NewLine();
917914
Visit(msg.GetBlock11());
@@ -924,6 +921,10 @@ friend struct TStaticData;
924921
NewLine();
925922
Visit(msg.GetBlock13());
926923
}
924+
if (msg.HasBlock14()) {
925+
NewLine();
926+
Visit(msg.GetBlock14());
927+
}
927928
}
928929

929930
void VisitDropTable(const TRule_drop_table_stmt& msg) {

ydb/library/yql/sql/v1/format/sql_format_ut.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
354354
"CREATE EXTERNAL DATA SOURCE usEr WITH (a = \"b\");\n"},
355355
{"creAte exTernAl daTa SouRce if not exists usEr With (a = \"b\")",
356356
"CREATE EXTERNAL DATA SOURCE IF NOT EXISTS usEr WITH (a = \"b\");\n"},
357+
{"creAte oR rePlaCe exTernAl daTa SouRce usEr With (a = \"b\")",
358+
"CREATE OR REPLACE EXTERNAL DATA SOURCE usEr WITH (a = \"b\");\n"},
357359
{"create external data source eds with (a=\"a\",b=\"b\",c = true)",
358360
"CREATE EXTERNAL DATA SOURCE eds WITH (\n\ta = \"a\",\n\tb = \"b\",\n\tc = TRUE\n);\n"},
359361
{"alter external data source eds set a true, reset (b, c), set (x=y, z=false)",
@@ -388,6 +390,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
388390
TCases cases = {
389391
{"creAte exTernAl TabLe usEr (a int) With (a = \"b\")",
390392
"CREATE EXTERNAL TABLE usEr (\n\ta int\n)\nWITH (a = \"b\");\n"},
393+
{"creAte oR rePlaCe exTernAl TabLe usEr (a int) With (a = \"b\")",
394+
"CREATE OR REPLACE EXTERNAL TABLE usEr (\n\ta int\n)\nWITH (a = \"b\");\n"},
391395
{"creAte exTernAl TabLe iF NOt Exists usEr (a int) With (a = \"b\")",
392396
"CREATE EXTERNAL TABLE IF NOT EXISTS usEr (\n\ta int\n)\nWITH (a = \"b\");\n"},
393397
{"create external table user (a int) with (a=\"b\",c=\"d\")",

ydb/library/yql/sql/v1/node.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,7 @@ namespace NSQLTranslationV1 {
12311231
TNodePtr BuildUpsertObjectOperation(TPosition pos, const TString& objectId, const TString& typeId,
12321232
std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
12331233
TNodePtr BuildCreateObjectOperation(TPosition pos, const TString& objectId, const TString& typeId,
1234-
bool existingOk, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
1234+
bool existingOk, bool replaceIfExists, std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
12351235
TNodePtr BuildAlterObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,
12361236
std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context);
12371237
TNodePtr BuildDropObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,

ydb/library/yql/sql/v1/object_processing.h

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,30 @@ class TCreateObject: public TObjectProcessorImpl {
4040
std::set<TString> FeaturesToReset;
4141
protected:
4242
bool ExistingOk = false;
43+
bool ReplaceIfExists = false;
4344
protected:
4445
virtual INode::TPtr BuildOptions() const override {
45-
return Y(Q(Y(Q("mode"), Q(ExistingOk ? "createObjectIfNotExists" : "createObject"))));
46+
TString mode;
47+
if (ExistingOk) {
48+
mode = "createObjectIfNotExists";
49+
} else if (ReplaceIfExists) {
50+
mode = "createObjectOrReplace";
51+
} else {
52+
mode = "createObject";
53+
}
54+
55+
return Y(Q(Y(Q("mode"), Q(mode))));
4656
}
4757
virtual INode::TPtr FillFeatures(INode::TPtr options) const override;
4858
public:
4959
TCreateObject(TPosition pos, const TString& objectId,
50-
const TString& typeId, bool existingOk, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context)
60+
const TString& typeId, bool existingOk, bool replaceIfExists, std::map<TString, TDeferredAtom>&& features, std::set<TString>&& featuresToReset, const TObjectOperatorContext& context)
5161
: TBase(pos, objectId, typeId, context)
5262
, Features(std::move(features))
5363
, FeaturesToReset(std::move(featuresToReset))
54-
, ExistingOk(existingOk) {
55-
}
64+
, ExistingOk(existingOk)
65+
, ReplaceIfExists(replaceIfExists) {
66+
}
5667
};
5768

5869
class TUpsertObject final: public TCreateObject {

0 commit comments

Comments
 (0)