Skip to content

Commit 9408283

Browse files
authored
Support types of sequences (#6669)
1 parent fd06fa3 commit 9408283

20 files changed

+554
-73
lines changed

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,6 +1493,15 @@ class TKqpGatewayProxy : public IKikimrGateway {
14931493
if (settings.SequenceSettings.Cycle) {
14941494
seqDesc->SetCycle(*settings.SequenceSettings.Cycle);
14951495
}
1496+
if (settings.SequenceSettings.DataType) {
1497+
if (settings.SequenceSettings.DataType == "int8") {
1498+
seqDesc->SetDataType("pgint8");
1499+
} else if (settings.SequenceSettings.DataType == "int4") {
1500+
seqDesc->SetDataType("pgint4");
1501+
} else if (settings.SequenceSettings.DataType == "int2") {
1502+
seqDesc->SetDataType("pgint2");
1503+
}
1504+
}
14961505

14971506
if (IsPrepare()) {
14981507
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
@@ -1613,6 +1622,15 @@ class TKqpGatewayProxy : public IKikimrGateway {
16131622
if (settings.SequenceSettings.Cycle) {
16141623
seqDesc->SetCycle(*settings.SequenceSettings.Cycle);
16151624
}
1625+
if (settings.SequenceSettings.DataType) {
1626+
if (settings.SequenceSettings.DataType == "int8") {
1627+
seqDesc->SetDataType("pgint8");
1628+
} else if (settings.SequenceSettings.DataType == "int4") {
1629+
seqDesc->SetDataType("pgint4");
1630+
} else if (settings.SequenceSettings.DataType == "int2") {
1631+
seqDesc->SetDataType("pgint2");
1632+
}
1633+
}
16161634

16171635
if (IsPrepare()) {
16181636
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();

ydb/core/kqp/provider/yql_kikimr_datasink.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,10 +772,15 @@ class TKikimrDataSink : public TDataProviderBase
772772
return nullptr;
773773
}
774774

775+
auto valueType = settings.ValueType.IsValid()
776+
? settings.ValueType.Cast()
777+
: Build<TCoAtom>(ctx, node->Pos()).Value("Null").Done();
778+
775779
return Build<TKiAlterSequence>(ctx, node->Pos())
776780
.World(node->Child(0))
777781
.DataSink(node->Child(1))
778782
.Sequence().Build(key.GetPGObjectId())
783+
.ValueType(valueType)
779784
.SequenceSettings(settings.SequenceSettings.Cast())
780785
.Settings(settings.Other)
781786
.MissingOk<TCoAtom>()

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ namespace {
313313
createSequenceSettings.Name = TString(createSequence.Sequence());
314314
createSequenceSettings.Temporary = TString(createSequence.Temporary()) == "true" ? true : false;
315315
createSequenceSettings.SequenceSettings = ParseSequenceSettings(createSequence.SequenceSettings());
316+
createSequenceSettings.SequenceSettings.DataType = TString(createSequence.ValueType());
316317

317318
return createSequenceSettings;
318319
}
@@ -327,6 +328,9 @@ namespace {
327328
TAlterSequenceSettings alterSequenceSettings;
328329
alterSequenceSettings.Name = TString(alterSequence.Sequence());
329330
alterSequenceSettings.SequenceSettings = ParseSequenceSettings(alterSequence.SequenceSettings());
331+
if (TString(alterSequence.ValueType()) != "Null") {
332+
alterSequenceSettings.SequenceSettings.DataType = TString(alterSequence.ValueType());
333+
}
330334

331335
return alterSequenceSettings;
332336
}

ydb/core/kqp/provider/yql_kikimr_expr_nodes.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -453,9 +453,10 @@
453453
{"Index": 0, "Name": "World", "Type": "TExprBase"},
454454
{"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
455455
{"Index": 2, "Name": "Sequence", "Type": "TCoAtom"},
456-
{"Index": 3, "Name": "SequenceSettings", "Type": "TCoNameValueTupleList"},
457-
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"},
458-
{"Index": 5, "Name": "MissingOk", "Type": "TCoAtom"}
456+
{"Index": 3, "Name": "ValueType", "Type": "TCoAtom"},
457+
{"Index": 4, "Name": "SequenceSettings", "Type": "TCoNameValueTupleList"},
458+
{"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList"},
459+
{"Index": 6, "Name": "MissingOk", "Type": "TCoAtom"}
459460
]
460461
},
461462
{

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ struct TSequenceSettings {
692692
TMaybe<i64> Increment;
693693
TMaybe<bool> Cycle;
694694
TMaybe<TString> OwnedBy;
695+
TMaybe<TString> DataType;
695696
};
696697

697698
struct TCreateSequenceSettings {

ydb/core/kqp/provider/yql_kikimr_type_ann.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1564,16 +1564,16 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
15641564
return TStatus::Error;
15651565
}
15661566

1567-
TString valueType = TString(node.ValueType());
1568-
if (valueType != "int8") {
1567+
if (TString(node.Temporary()) == "true") {
15691568
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
1570-
<< "Unsupported value type: " << valueType));
1569+
<< "Temporary sequences are currently not supported"));
15711570
return TStatus::Error;
15721571
}
15731572

1574-
if (TString(node.Temporary()) == "true") {
1573+
TString valueType = TString(node.ValueType());
1574+
if (valueType != "int8" && valueType != "int4" && valueType != "int2") {
15751575
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
1576-
<< "Temporary sequences is currently not supported"));
1576+
<< "Unsupported value type for sequence: " << valueType));
15771577
return TStatus::Error;
15781578
}
15791579

@@ -1604,6 +1604,13 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
16041604
return TStatus::Error;
16051605
}
16061606

1607+
TString valueType = TString(node.ValueType());
1608+
if (valueType != "Null" && valueType != "int8" && valueType != "int4" && valueType != "int2") {
1609+
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
1610+
<< "Unsupported value type for sequence: " << valueType));
1611+
return TStatus::Error;
1612+
}
1613+
16071614
node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
16081615
return TStatus::Ok;
16091616
}

ydb/core/kqp/ut/pg/kqp_pg_ut.cpp

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2497,6 +2497,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
24972497
const auto queryCreate = R"(
24982498
--!syntax_pg
24992499
CREATE SEQUENCE IF NOT EXISTS seq
2500+
as integer
25002501
START WITH 10
25012502
INCREMENT BY 2
25022503
MINVALUE 1
@@ -2516,11 +2517,12 @@ Y_UNIT_TEST_SUITE(KqpPg) {
25162517
auto& sequenceDescription = describeResult.GetPathDescription().GetSequenceDescription();
25172518
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetName(), "seq");
25182519
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetMinValue(), 1);
2519-
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetMaxValue(), Max<i64>());
2520+
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetMaxValue(), Max<i32>());
25202521
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetStartValue(), 10);
25212522
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetCache(), 3);
25222523
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetIncrement(), 2);
25232524
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetCycle(), true);
2525+
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetDataType(), "pgint4");
25242526
}
25252527

25262528
{
@@ -2530,6 +2532,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
25302532
const auto queryAlter = R"(
25312533
--!syntax_pg
25322534
ALTER SEQUENCE IF EXISTS seq
2535+
as smallint
25332536
START WITH 20
25342537
INCREMENT BY 5
25352538
MAXVALUE 30
@@ -2553,6 +2556,21 @@ Y_UNIT_TEST_SUITE(KqpPg) {
25532556
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetCache(), 3);
25542557
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetIncrement(), 5);
25552558
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetCycle(), false);
2559+
UNIT_ASSERT_VALUES_EQUAL(sequenceDescription.GetDataType(), "pgint2");
2560+
}
2561+
2562+
{
2563+
auto session = client.GetSession().GetValueSync().GetSession();
2564+
auto id = session.GetId();
2565+
2566+
const auto queryAlter = R"(
2567+
--!syntax_pg
2568+
ALTER SEQUENCE IF EXISTS seq
2569+
MAXVALUE 65000;
2570+
)";
2571+
2572+
auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2573+
UNIT_ASSERT(!resultAlter.IsSuccess());
25562574
}
25572575

25582576
{
@@ -2568,6 +2586,63 @@ Y_UNIT_TEST_SUITE(KqpPg) {
25682586
auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
25692587
UNIT_ASSERT(!resultAlter.IsSuccess());
25702588
}
2589+
2590+
{
2591+
auto session = client.GetSession().GetValueSync().GetSession();
2592+
auto id = session.GetId();
2593+
2594+
const auto queryAlter = R"(
2595+
--!syntax_pg
2596+
ALTER SEQUENCE IF EXISTS seq
2597+
MAXVALUE 32000;
2598+
)";
2599+
2600+
auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2601+
UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString());
2602+
}
2603+
2604+
{
2605+
auto session = client.GetSession().GetValueSync().GetSession();
2606+
auto id = session.GetId();
2607+
2608+
const auto queryAlter = R"(
2609+
--!syntax_pg
2610+
ALTER SEQUENCE IF EXISTS seq
2611+
as bigint;
2612+
)";
2613+
2614+
auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2615+
UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString());
2616+
}
2617+
2618+
{
2619+
auto session = client.GetSession().GetValueSync().GetSession();
2620+
auto id = session.GetId();
2621+
2622+
const auto queryAlter = R"(
2623+
--!syntax_pg
2624+
ALTER SEQUENCE IF EXISTS seq
2625+
as integer;
2626+
MAXVALUE 2147483647;
2627+
)";
2628+
2629+
auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2630+
UNIT_ASSERT(!resultAlter.IsSuccess());
2631+
}
2632+
2633+
{
2634+
auto session = client.GetSession().GetValueSync().GetSession();
2635+
auto id = session.GetId();
2636+
2637+
const auto queryAlter = R"(
2638+
--!syntax_pg
2639+
ALTER SEQUENCE IF EXISTS seq
2640+
MAXVALUE 2147483647;
2641+
)";
2642+
2643+
auto resultAlter = session.ExecuteQuery(queryAlter, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2644+
UNIT_ASSERT_C(resultAlter.IsSuccess(), resultAlter.GetIssues().ToString());
2645+
}
25712646
}
25722647

25732648
Y_UNIT_TEST(AlterColumnSetDefaultFromSequence) {

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1441,6 +1441,7 @@ message TSequenceDescription {
14411441
optional sint64 Increment = 9; // increment at each call, defaults to 1
14421442
optional bool Cycle = 10; // true when cycle on overflow is allowed
14431443
optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating
1444+
optional string DataType = 12; // data type of the sequence: Int64/pgint8, Int32/pgint4, Int16/pgint2, defaults to Int64/pgint8
14441445
}
14451446

14461447
message TSequenceSharding {

ydb/core/tx/datashard/export_common.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,12 @@ TMaybe<Ydb::Table::CreateTableRequest> GenYdbScheme(
6666
FillPartitioningSettings(scheme, tableDesc);
6767
FillKeyBloomFilter(scheme, tableDesc);
6868
FillReadReplicasSettings(scheme, tableDesc);
69-
FillSequenceDescription(scheme, tableDesc);
69+
70+
TString error;
71+
Ydb::StatusIds::StatusCode status;
72+
if (!FillSequenceDescription(scheme, tableDesc, status, error)) {
73+
return Nothing();
74+
}
7075

7176
return scheme;
7277
}

ydb/core/tx/schemeshard/schemeshard__operation_alter_sequence.cpp

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,23 +227,63 @@ class TPropose: public TSubOperationState {
227227

228228
std::optional<NKikimrSchemeOp::TSequenceDescription> GetAlterSequenceDescription(
229229
const NKikimrSchemeOp::TSequenceDescription& sequence, const NKikimrSchemeOp::TSequenceDescription& alter,
230-
TString& errStr, NKikimrScheme::EStatus& status) {
230+
const NScheme::TTypeRegistry& typeRegistry, bool pgTypesEnabled,
231+
TString& errStr) {
231232

232233
NKikimrSchemeOp::TSequenceDescription result = sequence;
233234

234235
i64 minValue = result.GetMinValue();
235236
i64 maxValue = result.GetMaxValue();
236237

238+
auto dataType = result.GetDataType();
239+
if (alter.HasDataType()) {
240+
dataType = alter.GetDataType();
241+
}
242+
243+
auto validationResult = ValidateSequenceType(sequence.GetName(), dataType, typeRegistry, pgTypesEnabled, errStr);
244+
if (!validationResult) {
245+
return std::nullopt;
246+
}
247+
248+
auto [dataTypeMinValue, dataTypeMaxValue] = *validationResult;
249+
250+
if (maxValue != Max<i16>() && maxValue != Max<i32>() && maxValue != Max<i64>()) {
251+
if (maxValue > dataTypeMaxValue) {
252+
errStr = Sprintf("MAXVALUE (%ld) is out of range for sequence data type %s", maxValue, dataType.c_str());
253+
return std::nullopt;
254+
}
255+
} else {
256+
maxValue = dataTypeMaxValue;
257+
}
258+
259+
if (minValue != Min<i16>() && minValue != Min<i32>() && minValue != Min<i64>()) {
260+
if (minValue < dataTypeMinValue) {
261+
errStr = Sprintf("MINVALUE (%ld) is out of range for sequence data type %s", minValue, dataType.c_str());
262+
return std::nullopt;
263+
}
264+
} else {
265+
minValue = dataTypeMinValue;
266+
}
267+
237268
if (alter.HasMinValue()) {
238269
minValue = alter.GetMinValue();
239270
}
240271
if (alter.HasMaxValue()) {
241272
maxValue = alter.GetMaxValue();
242273
}
243274

275+
if (maxValue > dataTypeMaxValue) {
276+
errStr = Sprintf("MAXVALUE (%ld) is out of range for sequence", maxValue);
277+
return std::nullopt;
278+
}
279+
280+
if (minValue < dataTypeMinValue) {
281+
errStr = Sprintf("MINVALUE (%ld) is out of range for sequence", minValue);
282+
return std::nullopt;
283+
}
284+
244285
if (minValue >= maxValue) {
245286
errStr = Sprintf("MINVALUE (%ld) must be less than MAXVALUE (%ld)", minValue, maxValue);
246-
status = NKikimrScheme::StatusInvalidParameter;
247287
return std::nullopt;
248288
}
249289

@@ -254,12 +294,10 @@ std::optional<NKikimrSchemeOp::TSequenceDescription> GetAlterSequenceDescription
254294

255295
if (startValue > maxValue) {
256296
errStr = Sprintf("START value (%ld) cannot be greater than MAXVALUE (%ld)", startValue, maxValue);
257-
status = NKikimrScheme::StatusInvalidParameter;
258297
return std::nullopt;
259298
}
260299
if (startValue < minValue) {
261300
errStr = Sprintf("START value (%ld) cannot be less than MINVALUE (%ld)", startValue, minValue);
262-
status = NKikimrScheme::StatusInvalidParameter;
263301
return std::nullopt;
264302
}
265303

@@ -282,6 +320,7 @@ std::optional<NKikimrSchemeOp::TSequenceDescription> GetAlterSequenceDescription
282320
result.SetCycle(cycle);
283321
result.SetCache(cache);
284322
result.SetStartValue(startValue);
323+
result.SetDataType(dataType);
285324

286325
return result;
287326
}
@@ -411,9 +450,11 @@ class TAlterSequence: public TSubOperation {
411450
return result;
412451
}
413452

453+
const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
414454
auto description = GetAlterSequenceDescription(
415-
sequenceInfo->Description, sequenceAlter, errStr, status);
455+
sequenceInfo->Description, sequenceAlter, *typeRegistry, context.SS->EnableTablePgTypes, errStr);
416456
if (!description) {
457+
status = NKikimrScheme::StatusInvalidParameter;
417458
result->SetError(status, errStr);
418459
return result;
419460
}

0 commit comments

Comments
 (0)