Skip to content

Commit bb76355

Browse files
Topic control plane DDL support for query service (#7438)
1 parent f7c4264 commit bb76355

29 files changed

+989
-350
lines changed

ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ class TAlterConfigsActor : public TAlterTopicActor<TAlterConfigsActor, TKafkaAlt
3434
public:
3535

3636
TAlterConfigsActor(
37-
TActorId requester,
37+
TActorId requester,
3838
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
3939
TString topicPath,
4040
TString databaseName,
4141
std::optional<ui64> retentionMs,
4242
std::optional<ui64> retentionBytes)
4343
: TAlterTopicActor<TAlterConfigsActor, TKafkaAlterConfigsRequest>(
44-
requester,
44+
requester,
4545
userToken,
4646
topicPath,
4747
databaseName)
@@ -54,12 +54,12 @@ class TAlterConfigsActor : public TAlterTopicActor<TAlterConfigsActor, TKafkaAlt
5454
~TAlterConfigsActor() = default;
5555

5656
void ModifyPersqueueConfig(
57-
const TActorContext& ctx,
57+
NKikimr::TAppData* appData,
5858
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
5959
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
6060
const NKikimrSchemeOp::TDirEntry& selfInfo
6161
) {
62-
Y_UNUSED(ctx);
62+
Y_UNUSED(appData);
6363
Y_UNUSED(pqGroupDescription);
6464
Y_UNUSED(selfInfo);
6565

@@ -150,7 +150,7 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) {
150150
resource.ResourceName.value(),
151151
Context->DatabasePath,
152152
convertedRetentions.Ms,
153-
convertedRetentions.Bytes
153+
convertedRetentions.Bytes
154154
));
155155

156156
InflyTopics++;
@@ -201,7 +201,7 @@ void TKafkaAlterConfigsActor::Reply(const TActorContext& ctx) {
201201
responseResource.ErrorCode = INVALID_REQUEST;
202202
response->Responses.push_back(responseResource);
203203
responseStatus = INVALID_REQUEST;
204-
}
204+
}
205205

206206
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));
207207

ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,13 +215,13 @@ class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, T
215215
public:
216216

217217
TCreatePartitionsActor(
218-
TActorId requester,
218+
TActorId requester,
219219
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
220220
TString topicPath,
221221
TString databaseName,
222222
ui32 partitionsNumber)
223223
: TAlterTopicActor<TCreatePartitionsActor, TKafkaTopicModificationRequest>(
224-
requester,
224+
requester,
225225
userToken,
226226
topicPath,
227227
databaseName)
@@ -234,12 +234,12 @@ class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, T
234234
};
235235

236236
void ModifyPersqueueConfig(
237-
const TActorContext& ctx,
237+
NKikimr::TAppData* appData,
238238
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
239239
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
240240
const NKikimrSchemeOp::TDirEntry& selfInfo
241-
) {
242-
Y_UNUSED(ctx);
241+
) override {
242+
Y_UNUSED(appData);
243243
Y_UNUSED(pqGroupDescription);
244244
Y_UNUSED(selfInfo);
245245

@@ -347,7 +347,7 @@ void TKafkaCreatePartitionsActor::Reply(const TActorContext& ctx) {
347347
response->Results.push_back(responseTopic);
348348

349349
responseStatus = INVALID_REQUEST;
350-
}
350+
}
351351
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));
352352

353353
Die(ctx);

ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre
1616
public:
1717

1818
TCreateTopicActor(
19-
TActorId requester,
19+
TActorId requester,
2020
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
2121
TString topicPath,
2222
TString databaseName,
@@ -78,13 +78,13 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre
7878
name,
7979
topicRequest,
8080
modifyScheme,
81-
ctx,
81+
NKikimr::AppData(ctx),
8282
error,
8383
workingDir,
8484
proposal.Record.GetDatabaseName()
8585
);
8686
if (codes.YdbCode != Ydb::StatusIds::SUCCESS) {
87-
return ReplyWithError(codes.YdbCode, codes.PQCode, error, ctx);
87+
return ReplyWithError(codes.YdbCode, codes.PQCode, error);
8888
}
8989
};
9090

@@ -192,7 +192,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
192192

193193
TopicNamesToRetentions[topicName] = std::pair<std::optional<ui64>, std::optional<ui64>>(
194194
convertedRetentions.Ms,
195-
convertedRetentions.Bytes
195+
convertedRetentions.Bytes
196196
);
197197

198198
ctx.Register(new TCreateTopicActor(
@@ -202,7 +202,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
202202
Context->DatabasePath,
203203
topic.NumPartitions,
204204
convertedRetentions.Ms,
205-
convertedRetentions.Bytes
205+
convertedRetentions.Bytes
206206
));
207207

208208
InflyTopics++;
@@ -243,7 +243,7 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) {
243243
responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message;
244244
}
245245

246-
auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional<ui64> configValue, TString configName) {
246+
auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional<ui64> configValue, TString configName) {
247247
if (configValue.has_value()) {
248248
TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs config;
249249
config.Name = configName;
@@ -271,7 +271,7 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) {
271271
responseTopic.ErrorMessage = "Duplicate topic in request.";
272272
response->Topics.push_back(responseTopic);
273273
responseStatus = INVALID_REQUEST;
274-
}
274+
}
275275

276276
Send(Context->ConnectionId,
277277
new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));

ydb/core/kafka_proxy/kafka_events.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,17 +208,17 @@ struct TGetOffsetsRequest : public NKikimr::NGRpcProxy::V1::TLocalRequestBase {
208208
TVector<ui32> PartitionIds;
209209
};
210210

211-
struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse>
212-
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
211+
struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse>
212+
, public NKikimr::NGRpcProxy::V1::TLocalResponseBase
213213
{
214214
TEvTopicOffsetsResponse()
215215
{}
216216

217217
TVector<TPartitionOffsetsInfo> Partitions;
218218
};
219219

220-
struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>
221-
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
220+
struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>
221+
, public NKikimr::NGRpcProxy::V1::TLocalResponseBase
222222
{
223223
TEvCommitedOffsetsResponse()
224224
{}
@@ -228,8 +228,8 @@ struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffse
228228
std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets;
229229
};
230230

231-
struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse>
232-
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
231+
struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse>
232+
, public NKikimr::NGRpcProxy::V1::TLocalResponseBase
233233
{
234234
enum EStatus {
235235
OK,

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,11 +308,29 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
308308
break;
309309
}
310310

311+
case NKqpProto::TKqpSchemeOperation::kCreateTopic: {
312+
const auto& modifyScheme = schemeOp.GetCreateTopic();
313+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
314+
break;
315+
}
316+
317+
case NKqpProto::TKqpSchemeOperation::kAlterTopic: {
318+
const auto& modifyScheme = schemeOp.GetAlterTopic();
319+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
320+
break;
321+
}
322+
323+
case NKqpProto::TKqpSchemeOperation::kDropTopic: {
324+
const auto& modifyScheme = schemeOp.GetDropTopic();
325+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
326+
break;
327+
}
328+
311329
case NKqpProto::TKqpSchemeOperation::kAnalyzeTable: {
312330
const auto& analyzeOperation = schemeOp.GetAnalyzeTable();
313-
331+
314332
auto analyzePromise = NewPromise<IKqpGateway::TGenericResult>();
315-
333+
316334
TVector<TString> columns{analyzeOperation.columns().begin(), analyzeOperation.columns().end()};
317335
IActor* analyzeActor = new TAnalyzeActor(analyzeOperation.GetTablePath(), columns, analyzePromise);
318336

@@ -326,9 +344,10 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
326344

327345
actorSystem->Send(selfId, ev.Release());
328346
});
329-
347+
330348
Become(&TKqpSchemeExecuter::ExecuteState);
331349
return;
350+
332351
}
333352

334353
default:
@@ -459,7 +478,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
459478
}
460479

461480
void Handle(TEvPrivate::TEvMakeTempDirResult::TPtr& result) {
462-
if (!result->Get()->Result.Success()) {
481+
if (!result->Get()->Result.Success()) {
463482
InternalError(TStringBuilder()
464483
<< "Error creating temporary directory for session " << SessionId
465484
<< ": " << result->Get()->Result.Issues().ToString(true));

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -977,7 +977,11 @@ class TKikimrIcGateway : public IKqpGateway {
977977
return NotImplemented<TGenericResult>();
978978
}
979979

980-
TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override {
980+
TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request, bool existingOk) override {
981+
if (existingOk) {
982+
return MakeFuture(ResultFromError<TGenericResult>("IF NOT EXISTS statement is not supported for CREATE TOPIC in yql script"));
983+
}
984+
981985
try {
982986
if (!CheckCluster(cluster)) {
983987
return InvalidCluster<TGenericResult>(cluster);
@@ -989,9 +993,27 @@ class TKikimrIcGateway : public IKqpGateway {
989993
catch (yexception& e) {
990994
return MakeFuture(ResultFromException<TGenericResult>(e));
991995
}
996+
Y_UNUSED(existingOk);
992997
}
993998

994-
TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request) override {
999+
TFuture<NKikimr::NGRpcProxy::V1::TAlterTopicResponse> AlterTopicPrepared(NYql::TAlterTopicSettings&& settings) override {
1000+
auto schemaTxPromise = NewPromise<NKikimr::NGRpcProxy::V1::TAlterTopicResponse>();
1001+
auto schemaTxFuture = schemaTxPromise.GetFuture();
1002+
1003+
NKikimr::NGRpcProxy::V1::TAlterTopicRequest request{
1004+
std::move(settings.Request), settings.WorkDir, settings.Name, Database, GetTokenCompat(),
1005+
settings.MissingOk
1006+
};
1007+
IActor* requestHandler = new NKikimr::NGRpcProxy::V1::TAlterTopicActorInternal(std::move(request), std::move(schemaTxPromise), settings.MissingOk);
1008+
RegisterActor(requestHandler);
1009+
return schemaTxFuture;
1010+
}
1011+
1012+
TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request, bool missingOk) override {
1013+
if (missingOk) {
1014+
return MakeFuture(ResultFromError<TGenericResult>("IF EXISTS statement is not supported for ALTER TOPIC in yql script"));
1015+
}
1016+
9951017
try {
9961018
if (!CheckCluster(cluster)) {
9971019
return InvalidCluster<TGenericResult>(cluster);
@@ -1005,7 +1027,11 @@ class TKikimrIcGateway : public IKqpGateway {
10051027
}
10061028
}
10071029

1008-
TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) override {
1030+
TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic, bool missingOk) override {
1031+
if (missingOk) {
1032+
return MakeFuture(ResultFromError<TGenericResult>("IF EXISTS statement is not supported for DROP TOPIC in yql script"));
1033+
}
1034+
10091035
try {
10101036
if (!CheckCluster(cluster)) {
10111037
return InvalidCluster<TGenericResult>(cluster);
@@ -1020,6 +1046,7 @@ class TKikimrIcGateway : public IKqpGateway {
10201046
catch (yexception& e) {
10211047
return MakeFuture(ResultFromException<TGenericResult>(e));
10221048
}
1049+
10231050
}
10241051

10251052
TFuture<TGenericResult> CreateReplication(const TString&, const NYql::TCreateReplicationSettings&) override {

0 commit comments

Comments
 (0)