Skip to content

Support create sequence #3662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
break;
}

case NKqpProto::TKqpSchemeOperation::kCreateSequence: {
const auto& modifyScheme = schemeOp.GetCreateSequence();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,14 @@ class TKikimrIcGateway : public IKqpGateway {
}
}

TFuture<TGenericResult> CreateSequence(const TString& cluster,
const NYql::TCreateSequenceSettings& settings, bool existingOk) override {
Y_UNUSED(cluster);
Y_UNUSED(settings);
Y_UNUSED(existingOk);
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override {
try {
if (!CheckCluster(cluster)) {
Expand Down
72 changes: 72 additions & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,78 @@ class TKqpGatewayProxy : public IKikimrGateway {
}
}

TFuture<TGenericResult> CreateSequence(const TString& cluster,
const TCreateSequenceSettings& settings, bool existingOk) override {
CHECK_PREPARED_DDL(CreateSequence);

if (!SessionCtx->Config().EnableSequences) {
IKqpGateway::TGenericResult errResult;
errResult.AddIssue(NYql::TIssue("Sequences are not supported yet."));
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(Ydb::StatusIds::UNSUPPORTED));
return MakeFuture(std::move(errResult));
}

try {

if (cluster != SessionCtx->GetCluster()) {
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
}

std::pair<TString, TString> pathPair;
{
TString error;
if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, false)) {
return MakeFuture(ResultFromError<TGenericResult>(error));
}
}

NKikimrSchemeOp::TModifyScheme schemeTx;
schemeTx.SetWorkingDir(pathPair.first);

schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateSequence);
schemeTx.SetFailedOnAlreadyExists(!existingOk);

NKikimrSchemeOp::TSequenceDescription* seqDesc = schemeTx.MutableSequence();

seqDesc->SetName(pathPair.second);

if (settings.SequenceSettings.MinValue) {
seqDesc->SetMinValue(*settings.SequenceSettings.MinValue);
}
if (settings.SequenceSettings.MaxValue) {
seqDesc->SetMaxValue(*settings.SequenceSettings.MaxValue);
}
if (settings.SequenceSettings.Increment) {
seqDesc->SetIncrement(*settings.SequenceSettings.Increment);
}
if (settings.SequenceSettings.StartValue) {
seqDesc->SetStartValue(*settings.SequenceSettings.StartValue);
}
if (settings.SequenceSettings.Cache) {
seqDesc->SetCache(*settings.SequenceSettings.Cache);
}
if (settings.SequenceSettings.Cycle) {
seqDesc->SetCycle(*settings.SequenceSettings.Cycle);
}

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
auto& phyTx = *phyQuery.AddTransactions();
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
phyTx.MutableSchemeOperation()->MutableCreateSequence()->Swap(&schemeTx);

TGenericResult result;
result.SetSuccess();
return MakeFuture(result);
} else {
return Gateway->ModifyScheme(std::move(schemeTx));
}
}
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
}

TFuture<TGenericResult> CreateTableStore(const TString& cluster,
const TCreateTableStoreSettings& settings, bool existingOk) override
{
Expand Down
57 changes: 56 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
return TStatus::Ok;
}

TStatus HandleCreateSequence(NNodes::TKiCreateSequence node, TExprContext& ctx) override {
Y_UNUSED(ctx);
Y_UNUSED(node);
return TStatus::Ok;
}

TStatus HandleModifyPermissions(TKiModifyPermissions node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "ModifyPermissions is not yet implemented for intent determination transformer"));
Expand Down Expand Up @@ -474,6 +480,10 @@ class TKikimrDataSink : public TDataProviderBase
return true;
}

if (node.IsCallable(TKiCreateSequence::CallableName())) {
return true;
}

if (node.IsCallable(TKiCreateUser::CallableName())
|| node.IsCallable(TKiAlterUser::CallableName())
|| node.IsCallable(TKiDropUser::CallableName())
Expand Down Expand Up @@ -550,6 +560,41 @@ class TKikimrDataSink : public TDataProviderBase
.Ptr();
}

static TExprNode::TPtr MakeCreateSequence(const TExprNode::TPtr& node, const TKikimrKey& key, TExprContext& ctx)
{
NCommon::TWriteSequenceSettings settings = NCommon::ParseSequenceSettings(TExprList(node->Child(4)), ctx);
YQL_ENSURE(settings.Mode);
auto mode = settings.Mode.Cast();
if (node->Child(3)->Content() != "Void") {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Creating sequence with data is not supported."));
return nullptr;
}

auto valueType = settings.ValueType.IsValid()
? settings.ValueType.Cast()
: Build<TCoAtom>(ctx, node->Pos()).Value("bigint").Done();

auto temporary = settings.Temporary.IsValid()
? settings.Temporary.Cast()
: Build<TCoAtom>(ctx, node->Pos()).Value("false").Done();

auto existringOk = (settings.Mode.Cast().Value() == "create_if_not_exists");

return Build<TKiCreateSequence>(ctx, node->Pos())
.World(node->Child(0))
.DataSink(node->Child(1))
.Sequence().Build(key.GetPGObjectId())
.ValueType(valueType)
.Temporary(temporary)
.ExistingOk<TCoAtom>()
.Value(existringOk)
.Build()
.SequenceSettings(settings.SequenceSettings.Cast())
.Settings(settings.Other)
.Done()
.Ptr();
}

bool RewriteIOExternal(const TKikimrKey& key, const TExprNode::TPtr& node, const TCoAtom& mode, TExprContext& ctx, TExprNode::TPtr& resultNode) {
TKiDataSink dataSink(node->ChildPtr(1));
auto& tableDesc = SessionCtx->Tables().GetTable(TString{dataSink.Cluster()}, key.GetTablePath());
Expand Down Expand Up @@ -1103,8 +1148,14 @@ class TKikimrDataSink : public TDataProviderBase

if (mode == "dropIndex") {
return MakePgDropObject(node, settings, key, ctx);
} else if (key.GetPGObjectType() == "pgSequence") {
if (mode == "create" || mode == "create_if_not_exists") {
return MakeCreateSequence(node, key, ctx);
} else {
YQL_ENSURE(false, "unknown Sequence mode \"" << TString(mode) << "\"");
}
} else {
YQL_ENSURE(false, "unknown PGObject mode \"" << TString(mode) << "\"");
YQL_ENSURE(false, "unknown PGObject with type: \"" << key.GetPGObjectType() << "\"");
}
}
}
Expand Down Expand Up @@ -1277,6 +1328,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleReturningList(node.Cast(), ctx);
}

if (auto node = callable.Maybe<TKiCreateSequence>()) {
return HandleCreateSequence(node.Cast(), ctx);
}

ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Unsupported function: "
<< callable.CallableName()));
return TStatus::Error;
Expand Down
46 changes: 46 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,31 @@ namespace {
};
}

TCreateSequenceSettings ParseCreateSequenceSettings(TKiCreateSequence createSequence) {
TCreateSequenceSettings createSequenceSettings;
createSequenceSettings.Name = TString(createSequence.Sequence());
createSequenceSettings.Temporary = TString(createSequence.Temporary()) == "true" ? true : false;
for (const auto& setting: createSequence.SequenceSettings()) {
auto name = setting.Name().Value();
auto value = TString(setting.Value().template Cast<TCoAtom>().Value());
if (name == "start") {
createSequenceSettings.SequenceSettings.StartValue = FromString<i64>(value);
} else if (name == "maxvalue") {
createSequenceSettings.SequenceSettings.MaxValue = FromString<i64>(value);
} else if (name == "minvalue") {
createSequenceSettings.SequenceSettings.MinValue = FromString<i64>(value);
} else if (name == "cache") {
createSequenceSettings.SequenceSettings.Cache = FromString<ui64>(value);
} else if (name == "cycle") {
createSequenceSettings.SequenceSettings.Cycle = value == "1" ? true : false;
} else if (name == "increment") {
createSequenceSettings.SequenceSettings.Increment = FromString<i64>(value);
}
}

return createSequenceSettings;
}

[[nodiscard]] TString AddConsumerToTopicRequest(
Ydb::Topic::Consumer* protoConsumer, const TCoTopicConsumer& consumer
) {
Expand Down Expand Up @@ -1641,6 +1666,27 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
return resultNode;
}, "Executing CREATE TOPIC");
}

if (auto maybeCreateSequence = TMaybeNode<TKiCreateSequence>(input)) {
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
return SyncStatus(requireStatus);
}

auto cluster = TString(maybeCreateSequence.Cast().DataSink().Cluster());
TCreateSequenceSettings createSequenceSettings = ParseCreateSequenceSettings(maybeCreateSequence.Cast());
bool existingOk = (maybeCreateSequence.ExistingOk().Cast().Value() == "1");

auto future = Gateway->CreateSequence(cluster, createSequenceSettings, existingOk);

return WrapFuture(future,
[](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
Y_UNUSED(res);
auto resultNode = ctx.NewWorld(input->Pos());
return resultNode;
}, "Executing CREATE SEQUENCE");
}

if (auto maybeAlter = TMaybeNode<TKiAlterTopic>(input)) {
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,21 @@
{"Index": 3, "Name": "TypeId", "Type": "TCoAtom"},
{"Index": 4, "Name": "MissingOk", "Type": "TCoAtom"}
]
},
{
"Name": "TKiCreateSequence",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "KiCreateSequence!"},
"Children": [
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
{"Index": 2, "Name": "Sequence", "Type": "TCoAtom"},
{"Index": 3, "Name": "ValueType", "Type": "TCoAtom"},
{"Index": 4, "Name": "Temporary", "Type": "TCoAtom"},
{"Index": 5, "Name": "ExistingOk", "Type": "TCoAtom"},
{"Index": 6, "Name": "SequenceSettings", "Type": "TCoNameValueTupleList"},
{"Index": 7, "Name": "Settings", "Type": "TCoNameValueTupleList"}
]
}
]
}
19 changes: 19 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,22 @@ struct TCreateExternalTableSettings {
TVector<std::pair<TString, TString>> SourceTypeParameters;
};

struct TSequenceSettings {
TMaybe<i64> MinValue;
TMaybe<i64> MaxValue;
TMaybe<i64> StartValue;
TMaybe<ui64> Cache;
TMaybe<i64> Increment;
TMaybe<bool> Cycle;
TMaybe<TString> OwnedBy;
};

struct TCreateSequenceSettings {
TString Name;
bool Temporary = false;
TSequenceSettings SequenceSettings;
};

struct TAlterExternalTableSettings {
TString ExternalTable;
};
Expand Down Expand Up @@ -841,6 +857,9 @@ class IKikimrGateway : public TThrRefBase {

virtual NThreading::TFuture<TGenericResult> DropGroup(const TString& cluster, const TDropGroupSettings& settings) = 0;

virtual NThreading::TFuture<TGenericResult> CreateSequence(const TString& cluster,
const TCreateSequenceSettings& settings, bool existingOk) = 0;

virtual NThreading::TFuture<TGenericResult> CreateColumnTable(
TKikimrTableMetadataPtr metadata, bool createDir, bool existingOk = false) = 0;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct TKikimrData {
DataSinkNames.insert(TKiEffects::CallableName());
DataSinkNames.insert(TPgDropObject::CallableName());
DataSinkNames.insert(TKiReturningList::CallableName());
DataSinkNames.insert(TKiCreateSequence::CallableName());

CommitModes.insert(CommitModeFlush);
CommitModes.insert(CommitModeRollback);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_provider_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class TKiSinkVisitorTransformer : public TSyncTransformerBase {
virtual TStatus HandleEffects(NNodes::TKiEffects node, TExprContext& ctx) = 0;
virtual TStatus HandlePgDropObject(NNodes::TPgDropObject node, TExprContext& ctx) = 0;

virtual TStatus HandleCreateSequence(NNodes::TKiCreateSequence node, TExprContext& ctx) = 0;

virtual TStatus HandleModifyPermissions(NNodes::TKiModifyPermissions node, TExprContext& ctx) = 0;

virtual TStatus HandleReturningList(NNodes::TKiReturningList node, TExprContext& ctx) = 0;
Expand Down
42 changes: 42 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,20 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
return true;
}

static bool CheckCreateSequenceSettings(const TCoNameValueTupleList& settings, TExprContext& ctx) {
const static std::unordered_set<TString> sequenceSettingNames =
{"start", "increment", "cache", "minvalue", "maxvalue", "cycle"};
for (const auto& setting : settings) {
auto name = setting.Name().Value();
if (!sequenceSettingNames.contains(TString(name))) {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << "unsupported setting with name: " << name));
return false;
}
}
return true;
}

virtual TStatus HandleCreateTopic(TKiCreateTopic node, TExprContext& ctx) override {
if (!CheckTopicSettings(node.Settings(), ctx)) {
return TStatus::Error;
Expand All @@ -1499,6 +1513,34 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
return TStatus::Ok;
}

virtual TStatus HandleCreateSequence(TKiCreateSequence node, TExprContext& ctx) override {
if(!CheckCreateSequenceSettings(node.SequenceSettings(), ctx)) {
return TStatus::Error;
}

if (!node.Settings().Empty()) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "Unsupported sequence settings"));
return TStatus::Error;
}

TString valueType = TString(node.ValueType());
if (valueType != "int8") {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "Unsupported value type: " << valueType));
return TStatus::Error;
}

if (TString(node.Temporary()) == "true") {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "Temporary sequences is currently not supported"));
return TStatus::Error;
}

node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
return TStatus::Ok;
}

virtual TStatus HandleAlterTopic(TKiAlterTopic node, TExprContext& ctx) override {
if (!CheckTopicSettings(node.Settings(), ctx)) {
return TStatus::Error;
Expand Down
Loading