Skip to content

Commit fac2961

Browse files
authored
Merge 5599b80 into f4ddcbf
2 parents f4ddcbf + 5599b80 commit fac2961

16 files changed

+385
-11
lines changed

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,12 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
235235
break;
236236
}
237237

238+
case NKqpProto::TKqpSchemeOperation::kCreateSequence: {
239+
const auto& modifyScheme = schemeOp.GetCreateSequence();
240+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
241+
break;
242+
}
243+
238244
default:
239245
InternalError(TStringBuilder() << "Unexpected scheme operation: "
240246
<< (ui32) schemeOp.GetOperationCase());

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,14 @@ class TKikimrIcGateway : public IKqpGateway {
950950
}
951951
}
952952

953+
TFuture<TGenericResult> CreateSequence(const TString& cluster,
954+
const NYql::TCreateSequenceSettings& settings, bool existingOk) override {
955+
Y_UNUSED(cluster);
956+
Y_UNUSED(settings);
957+
Y_UNUSED(existingOk);
958+
return NotImplemented<TGenericResult>();
959+
}
960+
953961
TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override {
954962
try {
955963
if (!CheckCluster(cluster)) {

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,6 +1390,78 @@ class TKqpGatewayProxy : public IKikimrGateway {
13901390
}
13911391
}
13921392

1393+
TFuture<TGenericResult> CreateSequence(const TString& cluster,
1394+
const TCreateSequenceSettings& settings, bool existingOk) override {
1395+
CHECK_PREPARED_DDL(CreateSequence);
1396+
1397+
if (!SessionCtx->Config().EnableSequences) {
1398+
IKqpGateway::TGenericResult errResult;
1399+
errResult.AddIssue(NYql::TIssue("Sequences are not supported yet."));
1400+
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(Ydb::StatusIds::UNSUPPORTED));
1401+
return MakeFuture(std::move(errResult));
1402+
}
1403+
1404+
try {
1405+
1406+
if (cluster != SessionCtx->GetCluster()) {
1407+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1408+
}
1409+
1410+
std::pair<TString, TString> pathPair;
1411+
{
1412+
TString error;
1413+
if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, false)) {
1414+
return MakeFuture(ResultFromError<TGenericResult>(error));
1415+
}
1416+
}
1417+
1418+
NKikimrSchemeOp::TModifyScheme schemeTx;
1419+
schemeTx.SetWorkingDir(pathPair.first);
1420+
1421+
schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateSequence);
1422+
schemeTx.SetFailedOnAlreadyExists(!existingOk);
1423+
1424+
NKikimrSchemeOp::TSequenceDescription* seqDesc = schemeTx.MutableSequence();
1425+
1426+
seqDesc->SetName(pathPair.second);
1427+
1428+
if (settings.SequenceSettings.MinValue) {
1429+
seqDesc->SetMinValue(*settings.SequenceSettings.MinValue);
1430+
}
1431+
if (settings.SequenceSettings.MaxValue) {
1432+
seqDesc->SetMaxValue(*settings.SequenceSettings.MaxValue);
1433+
}
1434+
if (settings.SequenceSettings.Increment) {
1435+
seqDesc->SetIncrement(*settings.SequenceSettings.Increment);
1436+
}
1437+
if (settings.SequenceSettings.StartValue) {
1438+
seqDesc->SetStartValue(*settings.SequenceSettings.StartValue);
1439+
}
1440+
if (settings.SequenceSettings.Cache) {
1441+
seqDesc->SetCache(*settings.SequenceSettings.Cache);
1442+
}
1443+
if (settings.SequenceSettings.Cycle) {
1444+
seqDesc->SetCycle(*settings.SequenceSettings.Cycle);
1445+
}
1446+
1447+
if (IsPrepare()) {
1448+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1449+
auto& phyTx = *phyQuery.AddTransactions();
1450+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1451+
phyTx.MutableSchemeOperation()->MutableCreateSequence()->Swap(&schemeTx);
1452+
1453+
TGenericResult result;
1454+
result.SetSuccess();
1455+
return MakeFuture(result);
1456+
} else {
1457+
return Gateway->ModifyScheme(std::move(schemeTx));
1458+
}
1459+
}
1460+
catch (yexception& e) {
1461+
return MakeFuture(ResultFromException<TGenericResult>(e));
1462+
}
1463+
}
1464+
13931465
TFuture<TGenericResult> CreateTableStore(const TString& cluster,
13941466
const TCreateTableStoreSettings& settings, bool existingOk) override
13951467
{

ydb/core/kqp/provider/yql_kikimr_datasink.cpp

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
9797
return TStatus::Ok;
9898
}
9999

100+
TStatus HandleCreateSequence(NNodes::TKiCreateSequence node, TExprContext& ctx) override {
101+
Y_UNUSED(ctx);
102+
Y_UNUSED(node);
103+
return TStatus::Ok;
104+
}
105+
100106
TStatus HandleModifyPermissions(TKiModifyPermissions node, TExprContext& ctx) override {
101107
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
102108
<< "ModifyPermissions is not yet implemented for intent determination transformer"));
@@ -474,6 +480,10 @@ class TKikimrDataSink : public TDataProviderBase
474480
return true;
475481
}
476482

483+
if (node.IsCallable(TKiCreateSequence::CallableName())) {
484+
return true;
485+
}
486+
477487
if (node.IsCallable(TKiCreateUser::CallableName())
478488
|| node.IsCallable(TKiAlterUser::CallableName())
479489
|| node.IsCallable(TKiDropUser::CallableName())
@@ -550,6 +560,41 @@ class TKikimrDataSink : public TDataProviderBase
550560
.Ptr();
551561
}
552562

563+
static TExprNode::TPtr MakeCreateSequence(const TExprNode::TPtr& node, const TKikimrKey& key, TExprContext& ctx)
564+
{
565+
NCommon::TWriteSequenceSettings settings = NCommon::ParseSequenceSettings(TExprList(node->Child(4)), ctx);
566+
YQL_ENSURE(settings.Mode);
567+
auto mode = settings.Mode.Cast();
568+
if (node->Child(3)->Content() != "Void") {
569+
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Creating sequence with data is not supported."));
570+
return nullptr;
571+
}
572+
573+
auto valueType = settings.ValueType.IsValid()
574+
? settings.ValueType.Cast()
575+
: Build<TCoAtom>(ctx, node->Pos()).Value("bigint").Done();
576+
577+
auto temporary = settings.Temporary.IsValid()
578+
? settings.Temporary.Cast()
579+
: Build<TCoAtom>(ctx, node->Pos()).Value("false").Done();
580+
581+
auto existringOk = (settings.Mode.Cast().Value() == "create_if_not_exists");
582+
583+
return Build<TKiCreateSequence>(ctx, node->Pos())
584+
.World(node->Child(0))
585+
.DataSink(node->Child(1))
586+
.Sequence().Build(key.GetPGObjectId())
587+
.ValueType(valueType)
588+
.Temporary(temporary)
589+
.ExistingOk<TCoAtom>()
590+
.Value(existringOk)
591+
.Build()
592+
.SequenceSettings(settings.SequenceSettings.Cast())
593+
.Settings(settings.Other)
594+
.Done()
595+
.Ptr();
596+
}
597+
553598
bool RewriteIOExternal(const TKikimrKey& key, const TExprNode::TPtr& node, const TCoAtom& mode, TExprContext& ctx, TExprNode::TPtr& resultNode) {
554599
TKiDataSink dataSink(node->ChildPtr(1));
555600
auto& tableDesc = SessionCtx->Tables().GetTable(TString{dataSink.Cluster()}, key.GetTablePath());
@@ -1103,8 +1148,14 @@ class TKikimrDataSink : public TDataProviderBase
11031148

11041149
if (mode == "dropIndex") {
11051150
return MakePgDropObject(node, settings, key, ctx);
1151+
} else if (key.GetPGObjectType() == "pgSequence") {
1152+
if (mode == "create" || mode == "create_if_not_exists") {
1153+
return MakeCreateSequence(node, key, ctx);
1154+
} else {
1155+
YQL_ENSURE(false, "unknown Sequence mode \"" << TString(mode) << "\"");
1156+
}
11061157
} else {
1107-
YQL_ENSURE(false, "unknown PGObject mode \"" << TString(mode) << "\"");
1158+
YQL_ENSURE(false, "unknown PGObject with type: \"" << key.GetPGObjectType() << "\"");
11081159
}
11091160
}
11101161
}
@@ -1277,6 +1328,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
12771328
return HandleReturningList(node.Cast(), ctx);
12781329
}
12791330

1331+
if (auto node = callable.Maybe<TKiCreateSequence>()) {
1332+
return HandleCreateSequence(node.Cast(), ctx);
1333+
}
1334+
12801335
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Unsupported function: "
12811336
<< callable.CallableName()));
12821337
return TStatus::Error;

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,31 @@ namespace {
285285
};
286286
}
287287

288+
TCreateSequenceSettings ParseCreateSequenceSettings(TKiCreateSequence createSequence) {
289+
TCreateSequenceSettings createSequenceSettings;
290+
createSequenceSettings.Name = TString(createSequence.Sequence());
291+
createSequenceSettings.Temporary = TString(createSequence.Temporary()) == "true" ? true : false;
292+
for (const auto& setting: createSequence.SequenceSettings()) {
293+
auto name = setting.Name().Value();
294+
auto value = TString(setting.Value().template Cast<TCoAtom>().Value());
295+
if (name == "start") {
296+
createSequenceSettings.SequenceSettings.StartValue = FromString<i64>(value);
297+
} else if (name == "maxvalue") {
298+
createSequenceSettings.SequenceSettings.MaxValue = FromString<i64>(value);
299+
} else if (name == "minvalue") {
300+
createSequenceSettings.SequenceSettings.MinValue = FromString<i64>(value);
301+
} else if (name == "cache") {
302+
createSequenceSettings.SequenceSettings.Cache = FromString<ui64>(value);
303+
} else if (name == "cycle") {
304+
createSequenceSettings.SequenceSettings.Cycle = value == "1" ? true : false;
305+
} else if (name == "increment") {
306+
createSequenceSettings.SequenceSettings.Increment = FromString<i64>(value);
307+
}
308+
}
309+
310+
return createSequenceSettings;
311+
}
312+
288313
[[nodiscard]] TString AddConsumerToTopicRequest(
289314
Ydb::Topic::Consumer* protoConsumer, const TCoTopicConsumer& consumer
290315
) {
@@ -1641,6 +1666,27 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
16411666
return resultNode;
16421667
}, "Executing CREATE TOPIC");
16431668
}
1669+
1670+
if (auto maybeCreateSequence = TMaybeNode<TKiCreateSequence>(input)) {
1671+
auto requireStatus = RequireChild(*input, 0);
1672+
if (requireStatus.Level != TStatus::Ok) {
1673+
return SyncStatus(requireStatus);
1674+
}
1675+
1676+
auto cluster = TString(maybeCreateSequence.Cast().DataSink().Cluster());
1677+
TCreateSequenceSettings createSequenceSettings = ParseCreateSequenceSettings(maybeCreateSequence.Cast());
1678+
bool existingOk = (maybeCreateSequence.ExistingOk().Cast().Value() == "1");
1679+
1680+
auto future = Gateway->CreateSequence(cluster, createSequenceSettings, existingOk);
1681+
1682+
return WrapFuture(future,
1683+
[](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
1684+
Y_UNUSED(res);
1685+
auto resultNode = ctx.NewWorld(input->Pos());
1686+
return resultNode;
1687+
}, "Executing CREATE SEQUENCE");
1688+
}
1689+
16441690
if (auto maybeAlter = TMaybeNode<TKiAlterTopic>(input)) {
16451691
auto requireStatus = RequireChild(*input, 0);
16461692
if (requireStatus.Level != TStatus::Ok) {

ydb/core/kqp/provider/yql_kikimr_expr_nodes.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,21 @@
417417
{"Index": 3, "Name": "TypeId", "Type": "TCoAtom"},
418418
{"Index": 4, "Name": "MissingOk", "Type": "TCoAtom"}
419419
]
420+
},
421+
{
422+
"Name": "TKiCreateSequence",
423+
"Base": "TCallable",
424+
"Match": {"Type": "Callable", "Name": "KiCreateSequence!"},
425+
"Children": [
426+
{"Index": 0, "Name": "World", "Type": "TExprBase"},
427+
{"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
428+
{"Index": 2, "Name": "Sequence", "Type": "TCoAtom"},
429+
{"Index": 3, "Name": "ValueType", "Type": "TCoAtom"},
430+
{"Index": 4, "Name": "Temporary", "Type": "TCoAtom"},
431+
{"Index": 5, "Name": "ExistingOk", "Type": "TCoAtom"},
432+
{"Index": 6, "Name": "SequenceSettings", "Type": "TCoNameValueTupleList"},
433+
{"Index": 7, "Name": "Settings", "Type": "TCoNameValueTupleList"}
434+
]
420435
}
421436
]
422437
}

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,22 @@ struct TCreateExternalTableSettings {
658658
TVector<std::pair<TString, TString>> SourceTypeParameters;
659659
};
660660

661+
struct TSequenceSettings {
662+
TMaybe<i64> MinValue;
663+
TMaybe<i64> MaxValue;
664+
TMaybe<i64> StartValue;
665+
TMaybe<ui64> Cache;
666+
TMaybe<i64> Increment;
667+
TMaybe<bool> Cycle;
668+
TMaybe<TString> OwnedBy;
669+
};
670+
671+
struct TCreateSequenceSettings {
672+
TString Name;
673+
bool Temporary = false;
674+
TSequenceSettings SequenceSettings;
675+
};
676+
661677
struct TAlterExternalTableSettings {
662678
TString ExternalTable;
663679
};
@@ -841,6 +857,9 @@ class IKikimrGateway : public TThrRefBase {
841857

842858
virtual NThreading::TFuture<TGenericResult> DropGroup(const TString& cluster, const TDropGroupSettings& settings) = 0;
843859

860+
virtual NThreading::TFuture<TGenericResult> CreateSequence(const TString& cluster,
861+
const TCreateSequenceSettings& settings, bool existingOk) = 0;
862+
844863
virtual NThreading::TFuture<TGenericResult> CreateColumnTable(
845864
TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0;
846865

ydb/core/kqp/provider/yql_kikimr_provider.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ struct TKikimrData {
6565
DataSinkNames.insert(TKiEffects::CallableName());
6666
DataSinkNames.insert(TPgDropObject::CallableName());
6767
DataSinkNames.insert(TKiReturningList::CallableName());
68+
DataSinkNames.insert(TKiCreateSequence::CallableName());
6869

6970
CommitModes.insert(CommitModeFlush);
7071
CommitModes.insert(CommitModeRollback);

ydb/core/kqp/provider/yql_kikimr_provider_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ class TKiSinkVisitorTransformer : public TSyncTransformerBase {
6363
virtual TStatus HandleEffects(NNodes::TKiEffects node, TExprContext& ctx) = 0;
6464
virtual TStatus HandlePgDropObject(NNodes::TPgDropObject node, TExprContext& ctx) = 0;
6565

66+
virtual TStatus HandleCreateSequence(NNodes::TKiCreateSequence node, TExprContext& ctx) = 0;
67+
6668
virtual TStatus HandleModifyPermissions(NNodes::TKiModifyPermissions node, TExprContext& ctx) = 0;
6769

6870
virtual TStatus HandleReturningList(NNodes::TKiReturningList node, TExprContext& ctx) = 0;

ydb/core/kqp/provider/yql_kikimr_type_ann.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1485,6 +1485,20 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
14851485
return true;
14861486
}
14871487

1488+
static bool CheckCreateSequenceSettings(const TCoNameValueTupleList& settings, TExprContext& ctx) {
1489+
const static std::unordered_set<TString> sequenceSettingNames =
1490+
{"start", "increment", "cache", "minvalue", "maxvalue", "cycle"};
1491+
for (const auto& setting : settings) {
1492+
auto name = setting.Name().Value();
1493+
if (!sequenceSettingNames.contains(TString(name))) {
1494+
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
1495+
TStringBuilder() << "unsupported setting with name: " << name));
1496+
return false;
1497+
}
1498+
}
1499+
return true;
1500+
}
1501+
14881502
virtual TStatus HandleCreateTopic(TKiCreateTopic node, TExprContext& ctx) override {
14891503
if (!CheckTopicSettings(node.Settings(), ctx)) {
14901504
return TStatus::Error;
@@ -1499,6 +1513,28 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
14991513
return TStatus::Ok;
15001514
}
15011515

1516+
virtual TStatus HandleCreateSequence(TKiCreateSequence node, TExprContext& ctx) override {
1517+
if(!CheckCreateSequenceSettings(node.SequenceSettings(), ctx)) {
1518+
return TStatus::Error;
1519+
}
1520+
1521+
if (!node.Settings().Empty()) {
1522+
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
1523+
<< "Unsupported sequence settings"));
1524+
return TStatus::Error;
1525+
}
1526+
1527+
TString valueType = TString(node.ValueType());
1528+
if (valueType != "int8") {
1529+
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
1530+
<< "Unsupported value type: " << valueType));
1531+
return TStatus::Error;
1532+
}
1533+
1534+
node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
1535+
return TStatus::Ok;
1536+
}
1537+
15021538
virtual TStatus HandleAlterTopic(TKiAlterTopic node, TExprContext& ctx) override {
15031539
if (!CheckTopicSettings(node.Settings(), ctx)) {
15041540
return TStatus::Error;

0 commit comments

Comments
 (0)