Skip to content

Topic control plane DDL support for query service (#7438) #7774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ class TAlterConfigsActor : public TAlterTopicActor<TAlterConfigsActor, TKafkaAlt
public:

TAlterConfigsActor(
TActorId requester,
TActorId requester,
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
std::optional<ui64> retentionMs,
std::optional<ui64> retentionBytes)
: TAlterTopicActor<TAlterConfigsActor, TKafkaAlterConfigsRequest>(
requester,
requester,
userToken,
topicPath,
databaseName)
Expand All @@ -54,12 +54,12 @@ class TAlterConfigsActor : public TAlterTopicActor<TAlterConfigsActor, TKafkaAlt
~TAlterConfigsActor() = default;

void ModifyPersqueueConfig(
const TActorContext& ctx,
NKikimr::TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
Y_UNUSED(ctx);
Y_UNUSED(appData);
Y_UNUSED(pqGroupDescription);
Y_UNUSED(selfInfo);

Expand Down Expand Up @@ -150,7 +150,7 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) {
resource.ResourceName.value(),
Context->DatabasePath,
convertedRetentions.Ms,
convertedRetentions.Bytes
convertedRetentions.Bytes
));

InflyTopics++;
Expand Down Expand Up @@ -201,7 +201,7 @@ void TKafkaAlterConfigsActor::Reply(const TActorContext& ctx) {
responseResource.ErrorCode = INVALID_REQUEST;
response->Responses.push_back(responseResource);
responseStatus = INVALID_REQUEST;
}
}

Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));

Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,13 @@ class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, T
public:

TCreatePartitionsActor(
TActorId requester,
TActorId requester,
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
ui32 partitionsNumber)
: TAlterTopicActor<TCreatePartitionsActor, TKafkaTopicModificationRequest>(
requester,
requester,
userToken,
topicPath,
databaseName)
Expand All @@ -234,12 +234,12 @@ class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, T
};

void ModifyPersqueueConfig(
const TActorContext& ctx,
NKikimr::TAppData* appData,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
Y_UNUSED(ctx);
) override {
Y_UNUSED(appData);
Y_UNUSED(pqGroupDescription);
Y_UNUSED(selfInfo);

Expand Down Expand Up @@ -347,7 +347,7 @@ void TKafkaCreatePartitionsActor::Reply(const TActorContext& ctx) {
response->Results.push_back(responseTopic);

responseStatus = INVALID_REQUEST;
}
}
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));

Die(ctx);
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre
public:

TCreateTopicActor(
TActorId requester,
TActorId requester,
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
Expand Down Expand Up @@ -78,13 +78,13 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre
name,
topicRequest,
modifyScheme,
ctx,
NKikimr::AppData(ctx),
error,
workingDir,
proposal.Record.GetDatabaseName()
);
if (codes.YdbCode != Ydb::StatusIds::SUCCESS) {
return ReplyWithError(codes.YdbCode, codes.PQCode, error, ctx);
return ReplyWithError(codes.YdbCode, codes.PQCode, error);
}
};

Expand Down Expand Up @@ -192,7 +192,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {

TopicNamesToRetentions[topicName] = std::pair<std::optional<ui64>, std::optional<ui64>>(
convertedRetentions.Ms,
convertedRetentions.Bytes
convertedRetentions.Bytes
);

ctx.Register(new TCreateTopicActor(
Expand All @@ -202,7 +202,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
Context->DatabasePath,
topic.NumPartitions,
convertedRetentions.Ms,
convertedRetentions.Bytes
convertedRetentions.Bytes
));

InflyTopics++;
Expand Down Expand Up @@ -243,7 +243,7 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) {
responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message;
}

auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional<ui64> configValue, TString configName) {
auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional<ui64> configValue, TString configName) {
if (configValue.has_value()) {
TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs config;
config.Name = configName;
Expand Down Expand Up @@ -271,7 +271,7 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) {
responseTopic.ErrorMessage = "Duplicate topic in request.";
response->Topics.push_back(responseTopic);
responseStatus = INVALID_REQUEST;
}
}

Send(Context->ConnectionId,
new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kafka_proxy/kafka_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,17 @@ struct TGetOffsetsRequest : public NKikimr::NGRpcProxy::V1::TLocalRequestBase {
TVector<ui32> PartitionIds;
};

struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse>
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse>
, public NKikimr::NGRpcProxy::V1::TLocalResponseBase
{
TEvTopicOffsetsResponse()
{}

TVector<TPartitionOffsetsInfo> Partitions;
};

struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>
, public NKikimr::NGRpcProxy::V1::TLocalResponseBase
{
TEvCommitedOffsetsResponse()
{}
Expand All @@ -228,8 +228,8 @@ struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffse
std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets;
};

struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse>
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse>
, public NKikimr::NGRpcProxy::V1::TLocalResponseBase
{
enum EStatus {
OK,
Expand Down
20 changes: 19 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,24 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
break;
}

case NKqpProto::TKqpSchemeOperation::kCreateTopic: {
const auto& modifyScheme = schemeOp.GetCreateTopic();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kAlterTopic: {
const auto& modifyScheme = schemeOp.GetAlterTopic();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kDropTopic: {
const auto& modifyScheme = schemeOp.GetDropTopic();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
Expand Down Expand Up @@ -435,7 +453,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
}

void Handle(TEvPrivate::TEvMakeTempDirResult::TPtr& result) {
if (!result->Get()->Result.Success()) {
if (!result->Get()->Result.Success()) {
InternalError(TStringBuilder()
<< "Error creating temporary directory for session " << SessionId
<< ": " << result->Get()->Result.Issues().ToString(true));
Expand Down
33 changes: 30 additions & 3 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,11 @@ class TKikimrIcGateway : public IKqpGateway {
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override {
TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request, bool existingOk) override {
if (existingOk) {
return MakeFuture(ResultFromError<TGenericResult>("IF NOT EXISTS statement is not supported for CREATE TOPIC in yql script"));
}

try {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
Expand All @@ -988,9 +992,27 @@ class TKikimrIcGateway : public IKqpGateway {
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}
Y_UNUSED(existingOk);
}

TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request) override {
TFuture<NKikimr::NGRpcProxy::V1::TAlterTopicResponse> AlterTopicPrepared(NYql::TAlterTopicSettings&& settings) override {
auto schemaTxPromise = NewPromise<NKikimr::NGRpcProxy::V1::TAlterTopicResponse>();
auto schemaTxFuture = schemaTxPromise.GetFuture();

NKikimr::NGRpcProxy::V1::TAlterTopicRequest request{
std::move(settings.Request), settings.WorkDir, settings.Name, Database, GetTokenCompat(),
settings.MissingOk
};
IActor* requestHandler = new NKikimr::NGRpcProxy::V1::TAlterTopicActorInternal(std::move(request), std::move(schemaTxPromise), settings.MissingOk);
RegisterActor(requestHandler);
return schemaTxFuture;
}

TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request, bool missingOk) override {
if (missingOk) {
return MakeFuture(ResultFromError<TGenericResult>("IF EXISTS statement is not supported for ALTER TOPIC in yql script"));
}

try {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
Expand All @@ -1004,7 +1026,11 @@ class TKikimrIcGateway : public IKqpGateway {
}
}

TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) override {
TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic, bool missingOk) override {
if (missingOk) {
return MakeFuture(ResultFromError<TGenericResult>("IF EXISTS statement is not supported for DROP TOPIC in yql script"));
}

try {
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
Expand All @@ -1019,6 +1045,7 @@ class TKikimrIcGateway : public IKqpGateway {
catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
}

}

TFuture<TGenericResult> CreateReplication(const TString&, const NYql::TCreateReplicationSettings&) override {
Expand Down
Loading
Loading