Skip to content

Commit ca0f5bf

Browse files
Merge c2d2ff5 into 3f13bd7
2 parents 3f13bd7 + c2d2ff5 commit ca0f5bf

29 files changed

+982
-345
lines changed

ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ class TAlterConfigsActor : public TAlterTopicActor<TAlterConfigsActor, TKafkaAlt
3434
public:
3535

3636
TAlterConfigsActor(
37-
TActorId requester,
37+
TActorId requester,
3838
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
3939
TString topicPath,
4040
TString databaseName,
4141
std::optional<ui64> retentionMs,
4242
std::optional<ui64> retentionBytes)
4343
: TAlterTopicActor<TAlterConfigsActor, TKafkaAlterConfigsRequest>(
44-
requester,
44+
requester,
4545
userToken,
4646
topicPath,
4747
databaseName)
@@ -54,12 +54,12 @@ class TAlterConfigsActor : public TAlterTopicActor<TAlterConfigsActor, TKafkaAlt
5454
~TAlterConfigsActor() = default;
5555

5656
void ModifyPersqueueConfig(
57-
const TActorContext& ctx,
57+
NKikimr::TAppData* appData,
5858
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
5959
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
6060
const NKikimrSchemeOp::TDirEntry& selfInfo
6161
) {
62-
Y_UNUSED(ctx);
62+
Y_UNUSED(appData);
6363
Y_UNUSED(pqGroupDescription);
6464
Y_UNUSED(selfInfo);
6565

@@ -150,7 +150,7 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) {
150150
resource.ResourceName.value(),
151151
Context->DatabasePath,
152152
convertedRetentions.Ms,
153-
convertedRetentions.Bytes
153+
convertedRetentions.Bytes
154154
));
155155

156156
InflyTopics++;
@@ -201,7 +201,7 @@ void TKafkaAlterConfigsActor::Reply(const TActorContext& ctx) {
201201
responseResource.ErrorCode = INVALID_REQUEST;
202202
response->Responses.push_back(responseResource);
203203
responseStatus = INVALID_REQUEST;
204-
}
204+
}
205205

206206
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));
207207

ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,13 +215,13 @@ class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, T
215215
public:
216216

217217
TCreatePartitionsActor(
218-
TActorId requester,
218+
TActorId requester,
219219
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
220220
TString topicPath,
221221
TString databaseName,
222222
ui32 partitionsNumber)
223223
: TAlterTopicActor<TCreatePartitionsActor, TKafkaTopicModificationRequest>(
224-
requester,
224+
requester,
225225
userToken,
226226
topicPath,
227227
databaseName)
@@ -234,12 +234,12 @@ class TCreatePartitionsActor : public TAlterTopicActor<TCreatePartitionsActor, T
234234
};
235235

236236
void ModifyPersqueueConfig(
237-
const TActorContext& ctx,
237+
NKikimr::TAppData* appData,
238238
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
239239
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
240240
const NKikimrSchemeOp::TDirEntry& selfInfo
241-
) {
242-
Y_UNUSED(ctx);
241+
) override {
242+
Y_UNUSED(appData);
243243
Y_UNUSED(pqGroupDescription);
244244
Y_UNUSED(selfInfo);
245245

@@ -347,7 +347,7 @@ void TKafkaCreatePartitionsActor::Reply(const TActorContext& ctx) {
347347
response->Results.push_back(responseTopic);
348348

349349
responseStatus = INVALID_REQUEST;
350-
}
350+
}
351351
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));
352352

353353
Die(ctx);

ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre
1616
public:
1717

1818
TCreateTopicActor(
19-
TActorId requester,
19+
TActorId requester,
2020
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
2121
TString topicPath,
2222
TString databaseName,
@@ -78,13 +78,13 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCre
7878
name,
7979
topicRequest,
8080
modifyScheme,
81-
ctx,
81+
NKikimr::AppData(ctx),
8282
error,
8383
workingDir,
8484
proposal.Record.GetDatabaseName()
8585
);
8686
if (codes.YdbCode != Ydb::StatusIds::SUCCESS) {
87-
return ReplyWithError(codes.YdbCode, codes.PQCode, error, ctx);
87+
return ReplyWithError(codes.YdbCode, codes.PQCode, error);
8888
}
8989
};
9090

@@ -192,7 +192,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
192192

193193
TopicNamesToRetentions[topicName] = std::pair<std::optional<ui64>, std::optional<ui64>>(
194194
convertedRetentions.Ms,
195-
convertedRetentions.Bytes
195+
convertedRetentions.Bytes
196196
);
197197

198198
ctx.Register(new TCreateTopicActor(
@@ -202,7 +202,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
202202
Context->DatabasePath,
203203
topic.NumPartitions,
204204
convertedRetentions.Ms,
205-
convertedRetentions.Bytes
205+
convertedRetentions.Bytes
206206
));
207207

208208
InflyTopics++;
@@ -243,7 +243,7 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) {
243243
responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message;
244244
}
245245

246-
auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional<ui64> configValue, TString configName) {
246+
auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional<ui64> configValue, TString configName) {
247247
if (configValue.has_value()) {
248248
TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs config;
249249
config.Name = configName;
@@ -271,7 +271,7 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) {
271271
responseTopic.ErrorMessage = "Duplicate topic in request.";
272272
response->Topics.push_back(responseTopic);
273273
responseStatus = INVALID_REQUEST;
274-
}
274+
}
275275

276276
Send(Context->ConnectionId,
277277
new TEvKafka::TEvResponse(CorrelationId, response, responseStatus));

ydb/core/kafka_proxy/kafka_events.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,17 +208,17 @@ struct TGetOffsetsRequest : public NKikimr::NGRpcProxy::V1::TLocalRequestBase {
208208
TVector<ui32> PartitionIds;
209209
};
210210

211-
struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse>
212-
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
211+
struct TEvTopicOffsetsResponse : public NActors::TEventLocal<TEvTopicOffsetsResponse, EvTopicOffsetsResponse>
212+
, public NKikimr::NGRpcProxy::V1::TLocalResponseBase
213213
{
214214
TEvTopicOffsetsResponse()
215215
{}
216216

217217
TVector<TPartitionOffsetsInfo> Partitions;
218218
};
219219

220-
struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>
221-
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
220+
struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffsetsResponse, EvTopicOffsetsResponse>
221+
, public NKikimr::NGRpcProxy::V1::TLocalResponseBase
222222
{
223223
TEvCommitedOffsetsResponse()
224224
{}
@@ -228,8 +228,8 @@ struct TEvCommitedOffsetsResponse : public NActors::TEventLocal<TEvCommitedOffse
228228
std::shared_ptr<std::unordered_map<ui32, std::unordered_map<TString, ui32>>> PartitionIdToOffsets;
229229
};
230230

231-
struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse>
232-
, public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase
231+
struct TEvTopicModificationResponse : public NActors::TEventLocal<TEvTopicModificationResponse, EvCreateTopicsResponse>
232+
, public NKikimr::NGRpcProxy::V1::TLocalResponseBase
233233
{
234234
enum EStatus {
235235
OK,

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,24 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
307307
break;
308308
}
309309

310+
case NKqpProto::TKqpSchemeOperation::kCreateTopic: {
311+
const auto& modifyScheme = schemeOp.GetCreateTopic();
312+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
313+
break;
314+
}
315+
316+
case NKqpProto::TKqpSchemeOperation::kAlterTopic: {
317+
const auto& modifyScheme = schemeOp.GetAlterTopic();
318+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
319+
break;
320+
}
321+
322+
case NKqpProto::TKqpSchemeOperation::kDropTopic: {
323+
const auto& modifyScheme = schemeOp.GetDropTopic();
324+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
325+
break;
326+
}
327+
310328
default:
311329
InternalError(TStringBuilder() << "Unexpected scheme operation: "
312330
<< (ui32) schemeOp.GetOperationCase());
@@ -435,7 +453,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
435453
}
436454

437455
void Handle(TEvPrivate::TEvMakeTempDirResult::TPtr& result) {
438-
if (!result->Get()->Result.Success()) {
456+
if (!result->Get()->Result.Success()) {
439457
InternalError(TStringBuilder()
440458
<< "Error creating temporary directory for session " << SessionId
441459
<< ": " << result->Get()->Result.Issues().ToString(true));

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -976,7 +976,11 @@ class TKikimrIcGateway : public IKqpGateway {
976976
return NotImplemented<TGenericResult>();
977977
}
978978

979-
TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request) override {
979+
TFuture<TGenericResult> CreateTopic(const TString& cluster, Ydb::Topic::CreateTopicRequest&& request, bool existingOk) override {
980+
if (existingOk) {
981+
return MakeFuture(ResultFromError<TGenericResult>("IF NOT EXISTS statement is not supported for CREATE TOPIC in yql script"));
982+
}
983+
980984
try {
981985
if (!CheckCluster(cluster)) {
982986
return InvalidCluster<TGenericResult>(cluster);
@@ -988,9 +992,27 @@ class TKikimrIcGateway : public IKqpGateway {
988992
catch (yexception& e) {
989993
return MakeFuture(ResultFromException<TGenericResult>(e));
990994
}
995+
Y_UNUSED(existingOk);
991996
}
992997

993-
TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request) override {
998+
TFuture<NKikimr::NGRpcProxy::V1::TAlterTopicResponse> AlterTopicPrepared(NYql::TAlterTopicSettings&& settings) override {
999+
auto schemaTxPromise = NewPromise<NKikimr::NGRpcProxy::V1::TAlterTopicResponse>();
1000+
auto schemaTxFuture = schemaTxPromise.GetFuture();
1001+
1002+
NKikimr::NGRpcProxy::V1::TAlterTopicRequest request{
1003+
std::move(settings.Request), settings.WorkDir, settings.Name, Database, GetTokenCompat(),
1004+
settings.MissingOk
1005+
};
1006+
IActor* requestHandler = new NKikimr::NGRpcProxy::V1::TAlterTopicActorInternal(std::move(request), std::move(schemaTxPromise), settings.MissingOk);
1007+
RegisterActor(requestHandler);
1008+
return schemaTxFuture;
1009+
}
1010+
1011+
TFuture<TGenericResult> AlterTopic(const TString& cluster, Ydb::Topic::AlterTopicRequest&& request, bool missingOk) override {
1012+
if (missingOk) {
1013+
return MakeFuture(ResultFromError<TGenericResult>("IF EXISTS statement is not supported for ALTER TOPIC in yql script"));
1014+
}
1015+
9941016
try {
9951017
if (!CheckCluster(cluster)) {
9961018
return InvalidCluster<TGenericResult>(cluster);
@@ -1004,7 +1026,11 @@ class TKikimrIcGateway : public IKqpGateway {
10041026
}
10051027
}
10061028

1007-
TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) override {
1029+
TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic, bool missingOk) override {
1030+
if (missingOk) {
1031+
return MakeFuture(ResultFromError<TGenericResult>("IF EXISTS statement is not supported for DROP TOPIC in yql script"));
1032+
}
1033+
10081034
try {
10091035
if (!CheckCluster(cluster)) {
10101036
return InvalidCluster<TGenericResult>(cluster);
@@ -1019,6 +1045,7 @@ class TKikimrIcGateway : public IKqpGateway {
10191045
catch (yexception& e) {
10201046
return MakeFuture(ResultFromException<TGenericResult>(e));
10211047
}
1048+
10221049
}
10231050

10241051
TFuture<TGenericResult> CreateReplication(const TString&, const NYql::TCreateReplicationSettings&) override {

0 commit comments

Comments
 (0)