From 95346b445b4c87b91f51153c2d949c2c637302d8 Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Wed, 27 Dec 2023 15:30:59 +0300 Subject: [PATCH] LOGBROKER-8840: add CreateTopics endpoint to KafkaAPI --- ydb/core/kafka_proxy/actors/actors.h | 1 + .../actors/kafka_create_topics_actor.cpp | 553 ++++++++++++++++++ .../actors/kafka_create_topics_actor.h | 39 ++ ydb/core/kafka_proxy/kafka_connection.cpp | 8 + ydb/core/kafka_proxy/kafka_constants.h | 8 + ydb/core/kafka_proxy/kafka_events.h | 18 + ydb/core/kafka_proxy/kafka_messages.cpp | 480 +++++++++++++++ ydb/core/kafka_proxy/kafka_messages.h | 540 +++++++++++++++++ ydb/core/kafka_proxy/ut/ut_protocol.cpp | 258 ++++++++ ydb/core/kafka_proxy/ya.make | 2 + 10 files changed, 1907 insertions(+) create mode 100644 ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp create mode 100644 ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h create mode 100644 ydb/core/kafka_proxy/kafka_constants.h diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index 3bb167ec50ef..d7de16925140 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -158,5 +158,6 @@ NActors::IActor* CreateKafkaFetchActor(const TContext::TPtr context, const ui64 NActors::IActor* CreateKafkaFindCoordinatorActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message); NActors::IActor* CreateKafkaOffsetCommitActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message); NActors::IActor* CreateKafkaOffsetFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message); +NActors::IActor* CreateKafkaCreateTopicsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message); } // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp new file mode 100644 index 000000000000..5702f844a28e --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp @@ -0,0 +1,553 @@ +#include "kafka_create_topics_actor.h" + +#include + +#include + +#include + + +namespace NKafka { + +class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx { +public: + using TRequest = TKafkaCreateTopicRequest; + + TKafkaCreateTopicRequest( + TIntrusiveConstPtr userToken, + TString topicPath, + TString databaseName, + const std::function sendResultCallback) + : UserToken(userToken) + , TopicPath(topicPath) + , DatabaseName(databaseName) + , SendResultCallback(sendResultCallback) + { + }; + + const TString path() const { + return TopicPath; + } + + TMaybe GetTraceId() const override { + return Nothing(); + } + + const TMaybe GetDatabaseName() const override { + return DatabaseName; + } + + const TIntrusiveConstPtr& GetInternalToken() const override { + return UserToken; + } + + const TString& GetSerializedToken() const override { + return UserToken->GetSerializedToken(); + } + + bool IsClientLost() const override { + return false; + }; + + virtual const google::protobuf::Message* GetRequest() const override { + return nullptr; + }; + + const TMaybe GetRequestType() const override { + return Nothing(); + }; + + void SetFinishAction(std::function&& cb) override { + Y_UNUSED(cb); + }; + + google::protobuf::Arena* GetArena() override { + return nullptr; + }; + + bool HasClientCapability(const TString& capability) const override { + Y_UNUSED(capability); + return false; + }; + + void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override { + processYdbStatusCode(status); + }; + + void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override { + Y_UNUSED(code); + Y_UNUSED(msg); + Y_UNUSED(details); + } + + TString GetPeerName() const override { + return ""; + } + + TInstant GetDeadline() const override { + return TInstant(); + } + + const TMaybe GetPeerMetaValues(const TString&) const override { + return Nothing(); + } + + TVector FindClientCert() const override { + return TVector(); + } + + TMaybe GetRlPath() const override { + return Nothing(); + } + + void RaiseIssue(const NYql::TIssue& issue) override{ + ReplyMessage = issue.GetMessage(); + Y_UNUSED(issue); + } + void RaiseIssues(const NYql::TIssues& issues) override { + Y_UNUSED(issues); + }; + const TString& GetRequestName() const override { + return DummyString; + }; + void SetDiskQuotaExceeded(bool disk) override { + Y_UNUSED(disk); + }; + + bool GetDiskQuotaExceeded() const override { + return false; + }; + + void AddAuditLogPart(const TStringBuf& name, const TString& value) override { + Y_UNUSED(name); + Y_UNUSED(value); + }; + + const NKikimr::NGRpcService::TAuditLogParts& GetAuditLogParts() const override { + return DummyAuditLogParts; + }; + + google::protobuf::Message* GetRequestMut() override { + return nullptr; + }; + + void SetRuHeader(ui64 ru) override { + Y_UNUSED(ru); + }; + + void AddServerHint(const TString& hint) override { + Y_UNUSED(hint); + }; + + void SetCostInfo(float consumed_units) override { + Y_UNUSED(consumed_units); + }; + + void SetStreamingNotify(NYdbGrpc::IRequestContextBase::TOnNextReply&& cb) override { + Y_UNUSED(cb); + }; + + void FinishStream(ui32 status) override { + Y_UNUSED(status); + }; + + void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override { + Y_UNUSED(in); + Y_UNUSED(status); + }; + + void Reply(NProtoBuf::Message* resp, ui32 status = 0) override { + Y_UNUSED(resp); + Y_UNUSED(status); + }; + + void SendOperation(const Ydb::Operations::Operation& operation) override { + Y_UNUSED(operation); + }; + + NWilson::TTraceId GetWilsonTraceId() const override { + return {}; + } + + void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override { + Y_UNUSED(result); + processYdbStatusCode(status); + }; + + void SendResult( + const google::protobuf::Message& result, + Ydb::StatusIds::StatusCode status, + const google::protobuf::RepeatedPtrField& message) override { + + Y_UNUSED(result); + Y_UNUSED(message); + processYdbStatusCode(status); + }; + + void SendResult( + Ydb::StatusIds::StatusCode status, + const google::protobuf::RepeatedPtrField& message) override { + + Y_UNUSED(message); + processYdbStatusCode(status); + }; + + const Ydb::Operations::OperationParams& operation_params() const { + return DummyParams; + } + + static TKafkaCreateTopicRequest* GetProtoRequest(std::shared_ptr request) { + return static_cast(&(*request)); + } + +protected: + void FinishRequest() override { + }; + +private: + const Ydb::Operations::OperationParams DummyParams; + const TIntrusiveConstPtr UserToken; + const TString DummyString; + const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts; + const TString TopicPath; + const TString DatabaseName; + const std::function SendResultCallback; + TString ReplyMessage; + + void processYdbStatusCode(Ydb::StatusIds::StatusCode& status) { + switch (status) { + case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS: + SendResultCallback(TEvKafka::TEvCreateTopicsResponse::EStatus::OK, ReplyMessage); + break; + case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST: + SendResultCallback(TEvKafka::TEvCreateTopicsResponse::EStatus::BAD_REQUEST, ReplyMessage); + break; + default: + SendResultCallback(TEvKafka::TEvCreateTopicsResponse::EStatus::ERROR, ReplyMessage); + } + } +}; + +class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase { + using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase; +public: + + TCreateTopicActor( + TActorId requester, + TIntrusiveConstPtr userToken, + TString topicPath, + TString databaseName, + ui32 partitionsNumber, + std::optional retentionMs, + std::optional retentionBytes) + : TBase(new TKafkaCreateTopicRequest( + userToken, + topicPath, + databaseName, + [this](TEvKafka::TEvCreateTopicsResponse::EStatus status, TString& message) { + this->SendResult(status, message); + }) + ) + , Requester(requester) + , TopicPath(topicPath) + , PartionsNumber(partitionsNumber) + , RetentionMs(retentionMs) + , RetentionBytes(retentionBytes) + { + KAFKA_LOG_D(LogMessage(databaseName)); + }; + + ~TCreateTopicActor() = default; + + void SendResult(TEvKafka::TEvCreateTopicsResponse::EStatus status, TString& message) { + THolder response(new TEvKafka::TEvCreateTopicsResponse()); + response->Status = status; + response->TopicPath = TopicPath; + response->Message = message; + Send(Requester, response.Release()); + Send(SelfId(), new TEvents::TEvPoison()); + } + + void FillProposeRequest( + NKikimr::TEvTxUserProxy::TEvProposeTransaction &proposal, + const TActorContext &ctx, + const TString &workingDir, + const TString &name + ) { + NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme()); + modifyScheme.SetWorkingDir(workingDir); + + auto pqDescr = modifyScheme.MutableCreatePersQueueGroup(); + pqDescr->SetPartitionPerTablet(1); + + Ydb::Topic::CreateTopicRequest topicRequest; + topicRequest.mutable_partitioning_settings()->set_min_active_partitions(PartionsNumber); + if (RetentionMs.has_value()) { + topicRequest.mutable_retention_period()->set_seconds(RetentionMs.value() / 1000); + } + if (RetentionBytes.has_value()) { + topicRequest.set_retention_storage_mb(RetentionBytes.value() / 1000'000); + } + topicRequest.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_RAW); + + TString error; + TYdbPqCodes codes = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl( + name, + topicRequest, + modifyScheme, + ctx, + error, + workingDir, + proposal.Record.GetDatabaseName() + ); + if (codes.YdbCode != Ydb::StatusIds::SUCCESS) { + return ReplyWithError(codes.YdbCode, codes.PQCode, error, ctx); + } + }; + + void Bootstrap(const NActors::TActorContext& ctx) { + TBase::Bootstrap(ctx); + SendProposeRequest(ctx); + Become(&TCreateTopicActor::StateWork); + }; + + void StateWork(TAutoPtr& ev) { + switch (ev->GetTypeRewrite()) { + hFunc(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult, TActorBase::Handle); + default: TBase::StateWork(ev); + } + } + + void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); } + +private: + const TActorId Requester; + const TString TopicPath; + const std::shared_ptr SerializedToken; + const ui32 PartionsNumber; + std::optional RetentionMs; + std::optional RetentionBytes; + + TStringBuilder LogMessage(TString& databaseName) { + TStringBuilder stringBuilder = TStringBuilder() + << "Create Topic actor. DatabaseName: " << databaseName + << ". TopicPath: " << TopicPath + << ". PartitionsNumber: " << PartionsNumber; + if (RetentionMs.has_value()) { + stringBuilder << ". RetentionMs: " << RetentionMs.value(); + } + if (RetentionBytes.has_value()) { + stringBuilder << ". RetentionBytes: " << RetentionBytes.value(); + } + return stringBuilder; + } +}; + +NActors::IActor* CreateKafkaCreateTopicsActor( + const TContext::TPtr context, + const ui64 correlationId, + const TMessagePtr& message +) { + return new TKafkaCreateTopicsActor(context, correlationId, message); +} + +void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { + KAFKA_LOG_D(InputLogMessage()); + + if (Message->ValidateOnly) { + ProcessValidateOnly(ctx); + return; + } + + std::unordered_set topicNames; + for (auto& topic : Message->Topics) { + auto& topicName = topic.Name.value(); + if (topicNames.contains(topicName)) { + DuplicateTopicNames.insert(topicName); + } else { + topicNames.insert(topicName); + } + } + + for (auto& topic : Message->Topics) { + auto& topicName = topic.Name.value(); + + if (DuplicateTopicNames.contains(topicName)) { + continue; + } + + if (topicName == "") { + auto result = MakeHolder(); + result->Status = TEvKafka::TEvCreateTopicsResponse::EStatus::BAD_REQUEST; + result->Message = "Empty topic name"; + this->TopicNamesToResponses[topicName] = TAutoPtr(result.Release()); + continue; + } + + std::optional retentionMs; + std::optional retentionBytes; + + auto parseRetention = [this, topic]( + TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig& config, + std::optional& retention) -> bool { + try { + retention = std::stoul(config.Value.value()); + return true; + } catch(std::invalid_argument) { + auto result = MakeHolder(); + result->Status = TEvKafka::TEvCreateTopicsResponse::EStatus::INVALID_CONFIG; + result->Message = "Provided retention value is not a number"; + this->TopicNamesToResponses[topic.Name.value()] = TAutoPtr(result.Release()); + return false; + } + }; + + auto processConfig = [&topic, &retentionMs, &retentionBytes, &parseRetention]() -> bool { + for (auto& config : topic.Configs) { + bool result = true; + if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) { + result = parseRetention(config, retentionBytes); + } else if (config.Name.value() == RETENTION_MS_CONFIG_NAME) { + result = parseRetention(config, retentionMs); + } + if (!result) { + return false; + } + } + return true; + }; + + if (!processConfig()) { + continue; + } + + TopicNamesToRetentions[topicName] = std::pair, std::optional>( + retentionMs, + retentionBytes + ); + + ctx.Register(new TCreateTopicActor( + SelfId(), + Context->UserToken, + topic.Name.value(), + Context->DatabasePath, + topic.NumPartitions, + retentionMs, + retentionBytes + )); + + InflyTopics++; + } + + if (InflyTopics > 0) { + Become(&TKafkaCreateTopicsActor::StateWork); + } else { + Reply(ctx); + } +}; + +void TKafkaCreateTopicsActor::Handle(const TEvKafka::TEvCreateTopicsResponse::TPtr& ev, const TActorContext& ctx) { + auto eventPtr = ev->Release(); + KAFKA_LOG_D(TStringBuilder() << "Create topics actor. Topic's " << eventPtr->TopicPath << " response received." << std::to_string(eventPtr->Status)); + TopicNamesToResponses[eventPtr->TopicPath] = eventPtr; + InflyTopics--; + if (InflyTopics == 0) { + Reply(ctx); + } +}; + +void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) { + TCreateTopicsResponseData::TPtr response = std::make_shared(); + EKafkaErrors responseStatus = NONE_ERROR; + + for (auto& requestTopic : Message->Topics) { + auto topicName = requestTopic.Name.value(); + + TCreateTopicsResponseData::TCreatableTopicResult responseTopic; + responseTopic.Name = topicName; + + if (TopicNamesToResponses.contains(topicName)) { + responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message; + } + + auto addConfigIfRequired = [this, &topicName, &responseTopic](std::optional configValue, TString configName) { + if (configValue.has_value()) { + TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs config; + config.Name = configName; + config.Value = std::to_string(TopicNamesToRetentions[topicName].first.value()); + config.IsSensitive = false; + config.ReadOnly = false; + responseTopic.Configs.push_back(config); + } + }; + + auto setError= [&responseTopic, &responseStatus](EKafkaErrors status) { + responseTopic.ErrorCode = status; + responseStatus = status; + }; + + if (DuplicateTopicNames.contains(topicName)) { + setError(DUPLICATE_RESOURCE); + } else { + switch (TopicNamesToResponses[topicName]->Status) { + case TEvKafka::TEvCreateTopicsResponse::OK: + responseTopic.ErrorCode = NONE_ERROR; + addConfigIfRequired(TopicNamesToRetentions[topicName].first, RETENTION_MS_CONFIG_NAME); + addConfigIfRequired(TopicNamesToRetentions[topicName].second, RETENTION_BYTES_CONFIG_NAME); + break; + case TEvKafka::TEvCreateTopicsResponse::BAD_REQUEST: + setError(INVALID_REQUEST); + break; + case TEvKafka::TEvCreateTopicsResponse::ERROR: + setError(UNKNOWN_SERVER_ERROR); + break; + case TEvKafka::TEvCreateTopicsResponse::INVALID_CONFIG: + setError(INVALID_CONFIG); + break; + } + } + response->Topics.push_back(responseTopic); + } + + Send(Context->ConnectionId, + new TEvKafka::TEvResponse(CorrelationId, response, responseStatus)); + + Die(ctx); +}; + +TStringBuilder TKafkaCreateTopicsActor::InputLogMessage() { + TStringBuilder stringBuilder; + stringBuilder << "Create topics actor: New request. ValidateOnly:" << (Message->ValidateOnly != 0) << " Topics: ["; + + bool isFirst = true; + for (auto& requestTopic : Message->Topics) { + if (isFirst) { + isFirst = false; + } else { + stringBuilder << ","; + } + stringBuilder << " " << requestTopic.Name.value(); + } + stringBuilder << " ]"; + return stringBuilder; +}; + + +void TKafkaCreateTopicsActor::ProcessValidateOnly(const NActors::TActorContext& ctx) { + TCreateTopicsResponseData::TPtr response = std::make_shared(); + + for (auto& requestTopic : Message->Topics) { + auto topicName = requestTopic.Name.value(); + + TCreateTopicsResponseData::TCreatableTopicResult responseTopic; + responseTopic.Name = topicName; + responseTopic.NumPartitions = requestTopic.NumPartitions; + responseTopic.ErrorCode = NONE_ERROR; + response->Topics.push_back(responseTopic); + } + + Send(Context->ConnectionId, + new TEvKafka::TEvResponse(CorrelationId, response, NONE_ERROR)); + Die(ctx); +}; +} diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h new file mode 100644 index 000000000000..afa1657311cd --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.h @@ -0,0 +1,39 @@ +#include "actors.h" +#include + +#include + +namespace NKafka { + +class TKafkaCreateTopicsActor: public NActors::TActorBootstrapped { +public: + TKafkaCreateTopicsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr& message) + : Context(context) + , CorrelationId(correlationId) + , Message(message) { + } + + void Bootstrap(const NActors::TActorContext& ctx); + void Handle(const TEvKafka::TEvCreateTopicsResponse::TPtr& ev, const TActorContext& ctx); + void Reply(const TActorContext& ctx); + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvKafka::TEvCreateTopicsResponse, Handle); + } + } + +private: + const TContext::TPtr Context; + const ui64 CorrelationId; + const TMessagePtr Message; + std::unordered_set DuplicateTopicNames; + ui32 InflyTopics = 0; + std::unordered_map> TopicNamesToResponses; + std::unordered_map, std::optional>> TopicNamesToRetentions; + + TStringBuilder InputLogMessage(); + void ProcessValidateOnly(const NActors::TActorContext& ctx); +}; + +} // NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index f799270c35b8..39276e817592 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -302,6 +302,10 @@ class TKafkaConnection: public TActorBootstrapped, public TNet Register(CreateKafkaOffsetCommitActor(Context, header->CorrelationId, message)); } + void HandleMessage(const TRequestHeaderData* header, const TMessagePtr& message) { + Register(CreateKafkaCreateTopicsActor(Context, header->CorrelationId, message)); + } + template TMessagePtr Cast(std::shared_ptr& request) { return TMessagePtr(request->Buffer, request->Message); @@ -382,6 +386,10 @@ class TKafkaConnection: public TActorBootstrapped, public TNet HandleMessage(&Request->Header, Cast(Request)); break; + case CREATE_TOPICS: + HandleMessage(&Request->Header, Cast(Request)); + break; + default: KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey); PassAway(); diff --git a/ydb/core/kafka_proxy/kafka_constants.h b/ydb/core/kafka_proxy/kafka_constants.h new file mode 100644 index 000000000000..88fcfc13c819 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_constants.h @@ -0,0 +1,8 @@ +#pragma once + +#include + +namespace NKafka { + static const TString RETENTION_MS_CONFIG_NAME = "retention.ms"; + static const TString RETENTION_BYTES_CONFIG_NAME = "retention.bytes"; +} diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index e0ddb88a2bcd..c649a2bd3812 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -28,6 +28,7 @@ struct TEvKafka { EvLeaveGroupRequest, EvKillReadSession, EvCommitedOffsetsResponse, + EvCreateTopicsResponse, EvResponse = EvRequest + 256, EvInternalEvents = EvResponse + 256, EvEnd @@ -224,6 +225,23 @@ struct TEvCommitedOffsetsResponse : public NActors::TEventLocal>> PartitionIdToOffsets; }; +struct TEvCreateTopicsResponse : public NActors::TEventLocal + , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase +{ + enum EStatus { + OK, + ERROR, + BAD_REQUEST, + INVALID_CONFIG, + }; + + TEvCreateTopicsResponse() + {} + + TString TopicPath; + EStatus Status; + TString Message; +}; }; } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index a101fcb59cc0..01b9c8c00c7e 100644 --- a/ydb/core/kafka_proxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp @@ -21,6 +21,7 @@ const std::unordered_map EApiKeyNames = { {EApiKey::SYNC_GROUP, "SYNC_GROUP"}, {EApiKey::SASL_HANDSHAKE, "SASL_HANDSHAKE"}, {EApiKey::API_VERSIONS, "API_VERSIONS"}, + {EApiKey::CREATE_TOPICS, "CREATE_TOPICS"}, {EApiKey::INIT_PRODUCER_ID, "INIT_PRODUCER_ID"}, {EApiKey::SASL_AUTHENTICATE, "SASL_AUTHENTICATE"}, }; @@ -54,6 +55,8 @@ std::unique_ptr CreateRequest(i16 apiKey) { return std::make_unique(); case API_VERSIONS: return std::make_unique(); + case CREATE_TOPICS: + return std::make_unique(); case INIT_PRODUCER_ID: return std::make_unique(); case SASL_AUTHENTICATE: @@ -91,6 +94,8 @@ std::unique_ptr CreateResponse(i16 apiKey) { return std::make_unique(); case API_VERSIONS: return std::make_unique(); + case CREATE_TOPICS: + return std::make_unique(); case INIT_PRODUCER_ID: return std::make_unique(); case SASL_AUTHENTICATE: @@ -176,6 +181,12 @@ TKafkaVersion RequestHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 1; } + case CREATE_TOPICS: + if (_version >= 5) { + return 2; + } else { + return 1; + } case INIT_PRODUCER_ID: if (_version >= 2) { return 2; @@ -268,6 +279,12 @@ TKafkaVersion ResponseHeaderVersion(i16 apiKey, TKafkaVersion _version) { // ApiVersionsResponse always includes a v0 header. // See KIP-511 for details. return 0; + case CREATE_TOPICS: + if (_version >= 5) { + return 1; + } else { + return 0; + } case INIT_PRODUCER_ID: if (_version >= 2) { return 1; @@ -4936,6 +4953,469 @@ i32 TApiVersionsResponseData::TFinalizedFeatureKey::Size(TKafkaVersion _version) } +// +// TCreateTopicsRequestData +// +const TCreateTopicsRequestData::TimeoutMsMeta::Type TCreateTopicsRequestData::TimeoutMsMeta::Default = 60000; +const TCreateTopicsRequestData::ValidateOnlyMeta::Type TCreateTopicsRequestData::ValidateOnlyMeta::Default = false; + +TCreateTopicsRequestData::TCreateTopicsRequestData() + : TimeoutMs(TimeoutMsMeta::Default) + , ValidateOnly(ValidateOnlyMeta::Default) +{} + +void TCreateTopicsRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreateTopicsRequestData"; + } + NPrivate::Read(_readable, _version, Topics); + NPrivate::Read(_readable, _version, TimeoutMs); + NPrivate::Read(_readable, _version, ValidateOnly); + + if (NPrivate::VersionCheck(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint(); + ui32 _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreateTopicsRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreateTopicsRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write(_collector, _writable, _version, Topics); + NPrivate::Write(_collector, _writable, _version, TimeoutMs); + NPrivate::Write(_collector, _writable, _version, ValidateOnly); + + if (NPrivate::VersionCheck(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreateTopicsRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size(_collector, _version, Topics); + NPrivate::Size(_collector, _version, TimeoutMs); + NPrivate::Size(_collector, _version, ValidateOnly); + + if (NPrivate::VersionCheck(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TCreateTopicsRequestData::TCreatableTopic +// +const TCreateTopicsRequestData::TCreatableTopic::NameMeta::Type TCreateTopicsRequestData::TCreatableTopic::NameMeta::Default = {""}; +const TCreateTopicsRequestData::TCreatableTopic::NumPartitionsMeta::Type TCreateTopicsRequestData::TCreatableTopic::NumPartitionsMeta::Default = 0; +const TCreateTopicsRequestData::TCreatableTopic::ReplicationFactorMeta::Type TCreateTopicsRequestData::TCreatableTopic::ReplicationFactorMeta::Default = 0; + +TCreateTopicsRequestData::TCreatableTopic::TCreatableTopic() + : Name(NameMeta::Default) + , NumPartitions(NumPartitionsMeta::Default) + , ReplicationFactor(ReplicationFactorMeta::Default) +{} + +void TCreateTopicsRequestData::TCreatableTopic::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreateTopicsRequestData::TCreatableTopic"; + } + NPrivate::Read(_readable, _version, Name); + NPrivate::Read(_readable, _version, NumPartitions); + NPrivate::Read(_readable, _version, ReplicationFactor); + NPrivate::Read(_readable, _version, Assignments); + NPrivate::Read(_readable, _version, Configs); + + if (NPrivate::VersionCheck(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint(); + ui32 _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreateTopicsRequestData::TCreatableTopic::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreateTopicsRequestData::TCreatableTopic"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write(_collector, _writable, _version, Name); + NPrivate::Write(_collector, _writable, _version, NumPartitions); + NPrivate::Write(_collector, _writable, _version, ReplicationFactor); + NPrivate::Write(_collector, _writable, _version, Assignments); + NPrivate::Write(_collector, _writable, _version, Configs); + + if (NPrivate::VersionCheck(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreateTopicsRequestData::TCreatableTopic::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size(_collector, _version, Name); + NPrivate::Size(_collector, _version, NumPartitions); + NPrivate::Size(_collector, _version, ReplicationFactor); + NPrivate::Size(_collector, _version, Assignments); + NPrivate::Size(_collector, _version, Configs); + + if (NPrivate::VersionCheck(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TCreateTopicsRequestData::TCreatableTopic::TCreatableReplicaAssignment +// +const TCreateTopicsRequestData::TCreatableTopic::TCreatableReplicaAssignment::PartitionIndexMeta::Type TCreateTopicsRequestData::TCreatableTopic::TCreatableReplicaAssignment::PartitionIndexMeta::Default = 0; + +TCreateTopicsRequestData::TCreatableTopic::TCreatableReplicaAssignment::TCreatableReplicaAssignment() + : PartitionIndex(PartitionIndexMeta::Default) +{} + +void TCreateTopicsRequestData::TCreatableTopic::TCreatableReplicaAssignment::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreateTopicsRequestData::TCreatableTopic::TCreatableReplicaAssignment"; + } + NPrivate::Read(_readable, _version, PartitionIndex); + NPrivate::Read(_readable, _version, BrokerIds); + + if (NPrivate::VersionCheck(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint(); + ui32 _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreateTopicsRequestData::TCreatableTopic::TCreatableReplicaAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreateTopicsRequestData::TCreatableTopic::TCreatableReplicaAssignment"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write(_collector, _writable, _version, PartitionIndex); + NPrivate::Write(_collector, _writable, _version, BrokerIds); + + if (NPrivate::VersionCheck(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreateTopicsRequestData::TCreatableTopic::TCreatableReplicaAssignment::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size(_collector, _version, PartitionIndex); + NPrivate::Size(_collector, _version, BrokerIds); + + if (NPrivate::VersionCheck(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig +// +const TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig::NameMeta::Type TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig::NameMeta::Default = {""}; +const TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig::ValueMeta::Type TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig::ValueMeta::Default = {""}; + +TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig::TCreateableTopicConfig() + : Name(NameMeta::Default) + , Value(ValueMeta::Default) +{} + +void TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig"; + } + NPrivate::Read(_readable, _version, Name); + NPrivate::Read(_readable, _version, Value); + + if (NPrivate::VersionCheck(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint(); + ui32 _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write(_collector, _writable, _version, Name); + NPrivate::Write(_collector, _writable, _version, Value); + + if (NPrivate::VersionCheck(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size(_collector, _version, Name); + NPrivate::Size(_collector, _version, Value); + + if (NPrivate::VersionCheck(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TCreateTopicsResponseData +// +const TCreateTopicsResponseData::ThrottleTimeMsMeta::Type TCreateTopicsResponseData::ThrottleTimeMsMeta::Default = 0; + +TCreateTopicsResponseData::TCreateTopicsResponseData() + : ThrottleTimeMs(ThrottleTimeMsMeta::Default) +{} + +void TCreateTopicsResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreateTopicsResponseData"; + } + NPrivate::Read(_readable, _version, ThrottleTimeMs); + NPrivate::Read(_readable, _version, Topics); + + if (NPrivate::VersionCheck(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint(); + ui32 _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreateTopicsResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreateTopicsResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write(_collector, _writable, _version, ThrottleTimeMs); + NPrivate::Write(_collector, _writable, _version, Topics); + + if (NPrivate::VersionCheck(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreateTopicsResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size(_collector, _version, ThrottleTimeMs); + NPrivate::Size(_collector, _version, Topics); + + if (NPrivate::VersionCheck(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TCreateTopicsResponseData::TCreatableTopicResult +// +const TCreateTopicsResponseData::TCreatableTopicResult::NameMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::NameMeta::Default = {""}; +const TCreateTopicsResponseData::TCreatableTopicResult::TopicIdMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::TopicIdMeta::Default = TKafkaUuid(0, 0); +const TCreateTopicsResponseData::TCreatableTopicResult::ErrorCodeMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::ErrorCodeMeta::Default = 0; +const TCreateTopicsResponseData::TCreatableTopicResult::ErrorMessageMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::ErrorMessageMeta::Default = {""}; +const TCreateTopicsResponseData::TCreatableTopicResult::TopicConfigErrorCodeMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::TopicConfigErrorCodeMeta::Default = 0; +const TCreateTopicsResponseData::TCreatableTopicResult::NumPartitionsMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::NumPartitionsMeta::Default = -1; +const TCreateTopicsResponseData::TCreatableTopicResult::ReplicationFactorMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::ReplicationFactorMeta::Default = -1; + +TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicResult() + : Name(NameMeta::Default) + , TopicId(TopicIdMeta::Default) + , ErrorCode(ErrorCodeMeta::Default) + , ErrorMessage(ErrorMessageMeta::Default) + , TopicConfigErrorCode(TopicConfigErrorCodeMeta::Default) + , NumPartitions(NumPartitionsMeta::Default) + , ReplicationFactor(ReplicationFactorMeta::Default) +{} + +void TCreateTopicsResponseData::TCreatableTopicResult::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreateTopicsResponseData::TCreatableTopicResult"; + } + NPrivate::Read(_readable, _version, Name); + NPrivate::Read(_readable, _version, TopicId); + NPrivate::Read(_readable, _version, ErrorCode); + NPrivate::Read(_readable, _version, ErrorMessage); + NPrivate::Read(_readable, _version, TopicConfigErrorCode); + NPrivate::Read(_readable, _version, NumPartitions); + NPrivate::Read(_readable, _version, ReplicationFactor); + NPrivate::Read(_readable, _version, Configs); + + if (NPrivate::VersionCheck(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint(); + ui32 _size = _readable.readUnsignedVarint(); + switch (_tag) { + case TopicConfigErrorCodeMeta::Tag: + NPrivate::ReadTag(_readable, _version, TopicConfigErrorCode); + break; + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreateTopicsResponseData::TCreatableTopicResult::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreateTopicsResponseData::TCreatableTopicResult"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write(_collector, _writable, _version, Name); + NPrivate::Write(_collector, _writable, _version, TopicId); + NPrivate::Write(_collector, _writable, _version, ErrorCode); + NPrivate::Write(_collector, _writable, _version, ErrorMessage); + NPrivate::Write(_collector, _writable, _version, TopicConfigErrorCode); + NPrivate::Write(_collector, _writable, _version, NumPartitions); + NPrivate::Write(_collector, _writable, _version, ReplicationFactor); + NPrivate::Write(_collector, _writable, _version, Configs); + + if (NPrivate::VersionCheck(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + NPrivate::WriteTag(_writable, _version, TopicConfigErrorCode); + } +} + +i32 TCreateTopicsResponseData::TCreatableTopicResult::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size(_collector, _version, Name); + NPrivate::Size(_collector, _version, TopicId); + NPrivate::Size(_collector, _version, ErrorCode); + NPrivate::Size(_collector, _version, ErrorMessage); + NPrivate::Size(_collector, _version, TopicConfigErrorCode); + NPrivate::Size(_collector, _version, NumPartitions); + NPrivate::Size(_collector, _version, ReplicationFactor); + NPrivate::Size(_collector, _version, Configs); + + if (NPrivate::VersionCheck(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs +// +const TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::NameMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::NameMeta::Default = {""}; +const TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::ValueMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::ValueMeta::Default = {""}; +const TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::ReadOnlyMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::ReadOnlyMeta::Default = false; +const TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::ConfigSourceMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::ConfigSourceMeta::Default = -1; +const TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::IsSensitiveMeta::Type TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::IsSensitiveMeta::Default = false; + +TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::TCreatableTopicConfigs() + : Name(NameMeta::Default) + , Value(ValueMeta::Default) + , ReadOnly(ReadOnlyMeta::Default) + , ConfigSource(ConfigSourceMeta::Default) + , IsSensitive(IsSensitiveMeta::Default) +{} + +void TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs"; + } + NPrivate::Read(_readable, _version, Name); + NPrivate::Read(_readable, _version, Value); + NPrivate::Read(_readable, _version, ReadOnly); + NPrivate::Read(_readable, _version, ConfigSource); + NPrivate::Read(_readable, _version, IsSensitive); + + if (NPrivate::VersionCheck(_version)) { + ui32 _numTaggedFields = _readable.readUnsignedVarint(); + for (ui32 _i = 0; _i < _numTaggedFields; ++_i) { + ui32 _tag = _readable.readUnsignedVarint(); + ui32 _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write(_collector, _writable, _version, Name); + NPrivate::Write(_collector, _writable, _version, Value); + NPrivate::Write(_collector, _writable, _version, ReadOnly); + NPrivate::Write(_collector, _writable, _version, ConfigSource); + NPrivate::Write(_collector, _writable, _version, IsSensitive); + + if (NPrivate::VersionCheck(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TCreateTopicsResponseData::TCreatableTopicResult::TCreatableTopicConfigs::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size(_collector, _version, Name); + NPrivate::Size(_collector, _version, Value); + NPrivate::Size(_collector, _version, ReadOnly); + NPrivate::Size(_collector, _version, ConfigSource); + NPrivate::Size(_collector, _version, IsSensitive); + + if (NPrivate::VersionCheck(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + // // TInitProducerIdRequestData // diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h index d091051ce062..bbe56aabea42 100644 --- a/ydb/core/kafka_proxy/kafka_messages.h +++ b/ydb/core/kafka_proxy/kafka_messages.h @@ -29,6 +29,7 @@ enum EApiKey { SYNC_GROUP = 14, // [ZK_BROKER, BROKER] SASL_HANDSHAKE = 17, // [ZK_BROKER, BROKER, CONTROLLER] API_VERSIONS = 18, // [ZK_BROKER, BROKER, CONTROLLER] + CREATE_TOPICS = 19, // [ZK_BROKER, BROKER, CONTROLLER] INIT_PRODUCER_ID = 22, // [ZK_BROKER, BROKER] SASL_AUTHENTICATE = 36, // [ZK_BROKER, BROKER, CONTROLLER] }; @@ -5365,6 +5366,545 @@ class TApiVersionsResponseData : public TApiMessage { }; +class TCreateTopicsRequestData : public TApiMessage { +public: + typedef std::shared_ptr TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + + TCreateTopicsRequestData(); + ~TCreateTopicsRequestData() = default; + + class TCreatableTopic : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + + TCreatableTopic(); + ~TCreatableTopic() = default; + + class TCreatableReplicaAssignment : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + + TCreatableReplicaAssignment(); + ~TCreatableReplicaAssignment() = default; + + struct PartitionIndexMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "partitionIndex"; + static constexpr const char* About = "The partition index."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + PartitionIndexMeta::Type PartitionIndex; + + struct BrokerIdsMeta { + using ItemType = TKafkaInt32; + using ItemTypeDesc = NPrivate::TKafkaIntDesc; + using Type = std::vector; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "brokerIds"; + static constexpr const char* About = "The brokers to place the partition on."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + BrokerIdsMeta::Type BrokerIds; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreatableReplicaAssignment& other) const = default; + }; + + class TCreateableTopicConfig : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + + TCreateableTopicConfig(); + ~TCreateableTopicConfig() = default; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The configuration name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + NameMeta::Type Name; + + struct ValueMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "value"; + static constexpr const char* About = "The configuration value."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + ValueMeta::Type Value; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreateableTopicConfig& other) const = default; + }; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The topic name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + NameMeta::Type Name; + + struct NumPartitionsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "numPartitions"; + static constexpr const char* About = "The number of partitions to create in the topic, or -1 if we are either specifying a manual partition assignment or using the default partitions."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + NumPartitionsMeta::Type NumPartitions; + + struct ReplicationFactorMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "replicationFactor"; + static constexpr const char* About = "The number of replicas to create for each partition in the topic, or -1 if we are either specifying a manual partition assignment or using the default replication factor."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + ReplicationFactorMeta::Type ReplicationFactor; + + struct AssignmentsMeta { + using ItemType = TCreatableReplicaAssignment; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "assignments"; + static constexpr const char* About = "The manual partition assignment, or the empty array if we are using automatic assignment."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + AssignmentsMeta::Type Assignments; + + struct ConfigsMeta { + using ItemType = TCreateableTopicConfig; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "configs"; + static constexpr const char* About = "The custom topic configurations to set."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + ConfigsMeta::Type Configs; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreatableTopic& other) const = default; + }; + + struct TopicsMeta { + using ItemType = TCreatableTopic; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "topics"; + static constexpr const char* About = "The topics to create."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + TopicsMeta::Type Topics; + + struct TimeoutMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "timeoutMs"; + static constexpr const char* About = "How long to wait in milliseconds before timing out the request."; + static const Type Default; // = 60000; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + TimeoutMsMeta::Type TimeoutMs; + + struct ValidateOnlyMeta { + using Type = TKafkaBool; + using TypeDesc = NPrivate::TKafkaBoolDesc; + + static constexpr const char* Name = "validateOnly"; + static constexpr const char* About = "If true, check that the topics can be created as specified, but don't create anything."; + static const Type Default; // = false; + + static constexpr TKafkaVersions PresentVersions = {1, Max()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + ValidateOnlyMeta::Type ValidateOnly; + + i16 ApiKey() const override { return CREATE_TOPICS; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreateTopicsRequestData& other) const = default; +}; + + +class TCreateTopicsResponseData : public TApiMessage { +public: + typedef std::shared_ptr TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + + TCreateTopicsResponseData(); + ~TCreateTopicsResponseData() = default; + + class TCreatableTopicResult : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 7}; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + + TCreatableTopicResult(); + ~TCreatableTopicResult() = default; + + class TCreatableTopicConfigs : public TMessage { + public: + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {5, 7}; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + + TCreatableTopicConfigs(); + ~TCreatableTopicConfigs() = default; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The configuration name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + NameMeta::Type Name; + + struct ValueMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "value"; + static constexpr const char* About = "The configuration value."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ValueMeta::Type Value; + + struct ReadOnlyMeta { + using Type = TKafkaBool; + using TypeDesc = NPrivate::TKafkaBoolDesc; + + static constexpr const char* Name = "readOnly"; + static constexpr const char* About = "True if the configuration is read-only."; + static const Type Default; // = false; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ReadOnlyMeta::Type ReadOnly; + + struct ConfigSourceMeta { + using Type = TKafkaInt8; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "configSource"; + static constexpr const char* About = "The configuration source."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ConfigSourceMeta::Type ConfigSource; + + struct IsSensitiveMeta { + using Type = TKafkaBool; + using TypeDesc = NPrivate::TKafkaBoolDesc; + + static constexpr const char* Name = "isSensitive"; + static constexpr const char* About = "True if this configuration is sensitive."; + static const Type Default; // = false; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + IsSensitiveMeta::Type IsSensitive; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreatableTopicConfigs& other) const = default; + }; + + struct NameMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "name"; + static constexpr const char* About = "The topic name."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + NameMeta::Type Name; + + struct TopicIdMeta { + using Type = TKafkaUuid; + using TypeDesc = NPrivate::TKafkaUuidDesc; + + static constexpr const char* Name = "topicId"; + static constexpr const char* About = "The unique topic ID"; + static const Type Default; // = TKafkaUuid(0, 0); + + static constexpr TKafkaVersions PresentVersions = {7, Max()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + TopicIdMeta::Type TopicId; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + ErrorCodeMeta::Type ErrorCode; + + struct ErrorMessageMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "errorMessage"; + static constexpr const char* About = "The error message, or null if there was no error."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = {1, Max()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + ErrorMessageMeta::Type ErrorMessage; + + struct TopicConfigErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "topicConfigErrorCode"; + static constexpr const char* About = "Optional topic config error returned if configs are not returned in the response."; + static constexpr const TKafkaInt32 Tag = 0; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {5, Max()}; + static constexpr TKafkaVersions TaggedVersions = VersionsAlways; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + TopicConfigErrorCodeMeta::Type TopicConfigErrorCode; + + struct NumPartitionsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "numPartitions"; + static constexpr const char* About = "Number of partitions of the topic."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {5, Max()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + NumPartitionsMeta::Type NumPartitions; + + struct ReplicationFactorMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "replicationFactor"; + static constexpr const char* About = "Replication factor of the topic."; + static const Type Default; // = -1; + + static constexpr TKafkaVersions PresentVersions = {5, Max()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ReplicationFactorMeta::Type ReplicationFactor; + + struct ConfigsMeta { + using ItemType = TCreatableTopicConfigs; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "configs"; + static constexpr const char* About = "Configuration of the topic."; + + static constexpr TKafkaVersions PresentVersions = {5, Max()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; + }; + ConfigsMeta::Type Configs; + + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreatableTopicResult& other) const = default; + }; + + struct ThrottleTimeMsMeta { + using Type = TKafkaInt32; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "throttleTimeMs"; + static constexpr const char* About = "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {2, Max()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + ThrottleTimeMsMeta::Type ThrottleTimeMs; + + struct TopicsMeta { + using ItemType = TCreatableTopicResult; + using ItemTypeDesc = NPrivate::TKafkaStructDesc; + using Type = std::vector; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "topics"; + static constexpr const char* About = "Results for each topic we tried to create."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {5, Max()}; + }; + TopicsMeta::Type Topics; + + i16 ApiKey() const override { return CREATE_TOPICS; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TCreateTopicsResponseData& other) const = default; +}; + + class TInitProducerIdRequestData : public TApiMessage { public: typedef std::shared_ptr TPtr; diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 382650aa2108..f01d6951a469 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -288,6 +288,25 @@ std::vector Read(std::shared_ptr< return result; } +struct TopicToCreate { + TopicToCreate( + TString name, + ui32 partionsNumber, + std::optional retentionMs = std::nullopt, + std::optional retentionBytes = std::nullopt) + : Name(name) + , PartitionsNumber(partionsNumber) + , RetentionMs(retentionMs) + , RetentionBytes(retentionBytes) + { + } + + TString Name; + ui32 PartitionsNumber; + std::optional RetentionMs; + std::optional RetentionBytes; +}; + class TTestClient { public: TTestClient(ui16 port, const TString clientName = "TestClient") @@ -561,6 +580,36 @@ class TTestClient { return WriteAndRead(header, request); } + TMessagePtr CreateTopics(std::vector topicsToCreate, bool validateOnly = false) { + Cerr << ">>>>> TCreateTopicsRequestData\n"; + + TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_TOPICS, 7); + TCreateTopicsRequestData request; + request.ValidateOnly = validateOnly; + + for (auto& topicToCreate : topicsToCreate) { + NKafka::TCreateTopicsRequestData::TCreatableTopic topic; + topic.Name = topicToCreate.Name; + topic.NumPartitions = topicToCreate.PartitionsNumber; + + auto addConfig = [&topic](std::optional configValue, TString configName) { + if (configValue.has_value()) { + NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config; + config.Name = configName; + config.Value = configValue.value(); + topic.Configs.push_back(config); + } + }; + + addConfig(topicToCreate.RetentionMs, "retention.ms"); + addConfig(topicToCreate.RetentionBytes, "retention.bytes"); + + request.Topics.push_back(topic); + } + + return WriteAndRead(header, request); + } + void UnknownApiKey() { Cerr << ">>>>> Unknown apiKey\n"; @@ -1373,6 +1422,215 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } } // Y_UNIT_TEST(OffsetFetchScenario) + Y_UNIT_TEST(CreateTopicsScenario) { + TInsecureTestServer testServer("2"); + + // TString key = "record-key"; + // TString value = "record-value"; + // TString headerKey = "header-key"; + // TString headerValue = "header-value"; + + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); + + TTestClient client(testServer.Port); + + { + auto msg = client.ApiVersions(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 15u); + } + + { + auto msg = client.SaslHandshake(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); + } + + { + auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + } + + auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); + { + // Creation of two topics + auto msg = client.CreateTopics({ + TopicToCreate("topic-999-test", 12), + TopicToCreate("topic-998-test", 13) + }); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-999-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[1].Name.value(), "topic-998-test"); + + auto result999 = pqClient.DescribeTopic("/Root/topic-999-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result999.IsSuccess()); + UNIT_ASSERT_EQUAL(result999.GetTopicDescription().GetPartitions().size(), 12); + + auto result998 = pqClient.DescribeTopic("/Root/topic-998-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result998.IsSuccess()); + UNIT_ASSERT_EQUAL(result998.GetTopicDescription().GetPartitions().size(), 13); + } + + { + // Duplicate topics + auto msg = client.CreateTopics({ + TopicToCreate("topic-997-test", 1), + TopicToCreate("topic-997-test", 1) + }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-997-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, DUPLICATE_RESOURCE); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[1].Name.value(), "topic-997-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[1].ErrorCode, DUPLICATE_RESOURCE); + + auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); + auto result = pqClient.DescribeTopic("/Root/topic-997-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + } + + { + // One OK, two duplicate topics + auto msg = client.CreateTopics({ + TopicToCreate("topic-996-test", 1), + TopicToCreate("topic-995-test", 1), + TopicToCreate("topic-995-test", 1) + }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-996-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, NONE_ERROR); + auto result996 = pqClient.DescribeTopic("/Root/topic-996-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result996.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[1].Name.value(), "topic-995-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[1].ErrorCode, DUPLICATE_RESOURCE); + + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[2].Name.value(), "topic-995-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[2].ErrorCode, DUPLICATE_RESOURCE); + + auto result995 = pqClient.DescribeTopic("/Root/topic-995-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(!result995.IsSuccess()); + } + + { + // Existing topic + client.CreateTopics({ TopicToCreate("topic-994-test", 1) }); + auto result = pqClient.DescribeTopic("/Root/topic-994-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result.IsSuccess()); + + auto msg = client.CreateTopics({ TopicToCreate("topic-994-test", 1) }); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-994-test"); + } + + { + // Set valid retention + ui64 retentionMs = 168 * 60 * 60 * 1000; + ui64 retentionBytes = 51'200'000'000ul; + + auto msg = client.CreateTopics({ TopicToCreate("topic-993-test", 1, std::to_string(retentionMs), std::to_string(retentionBytes))}); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-993-test"); + + auto result993 = pqClient.DescribeTopic("/Root/topic-993-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result993.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(result993.GetTopicDescription().GetRetentionPeriod().MilliSeconds(), retentionMs); + UNIT_ASSERT_VALUES_EQUAL(result993.GetTopicDescription().GetRetentionStorageMb(), retentionBytes / 1'000'000); + } + + { + // retention.ms is not number + auto msg = client.CreateTopics({ TopicToCreate("topic-992-test", 1, "not_a_number", "42")}); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-992-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_CONFIG); + + auto result992 = pqClient.DescribeTopic("/Root/topic-992-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(!result992.IsSuccess()); + } + + { + // retention.bytes is not number + auto msg = client.CreateTopics({ TopicToCreate("topic-991-test", 1, "42", "not_a_number")}); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-991-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_CONFIG); + + auto result992 = pqClient.DescribeTopic("/Root/topic-992-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(!result992.IsSuccess()); + } + + { + // Empty topic name + auto msg = client.CreateTopics({ TopicToCreate("", 1)}); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); + } + + { + // Wrong topic name + auto msg = client.CreateTopics({ TopicToCreate("//////", 1)}); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); + } + + { + // Wrong topic name + auto msg = client.CreateTopics({ TopicToCreate("/Root/", 1)}); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); + } + + { + // Wrong topic name + auto msg = client.CreateTopics({ TopicToCreate("/Root//", 1)}); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); + } + + { + // Set invalid retention + ui64 retentionMs = 13 * 60 * 60 * 1000; + ui64 retentionBytes = 11'000'000'000ul; + + auto msg = client.CreateTopics({ TopicToCreate("topic-990-test", 1, std::to_string(retentionMs), std::to_string(retentionBytes))}); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-990-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); + + auto result992 = pqClient.DescribeTopic("/Root/topic-990-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(!result992.IsSuccess()); + } + + { + // Set only ms retention + ui64 retentionMs = 168 * 60 * 60 * 1000; + auto msg = client.CreateTopics({ TopicToCreate("topic-989-test", 1, std::to_string(retentionMs)) }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-989-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); + + auto result993 = pqClient.DescribeTopic("/Root/topic-989-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(!result993.IsSuccess()); + } + + { + // Validation only + auto msg = client.CreateTopics({ TopicToCreate("topic-988-test", 1)}, true); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-988-test"); + + auto result993 = pqClient.DescribeTopic("/Root/topic-988-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(!result993.IsSuccess()); + } + + } // Y_UNIT_TEST(CreateTopicsScenario) + Y_UNIT_TEST(LoginWithApiKey) { TInsecureTestServer testServer; diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make index 97d387e48930..a74745d4d814 100644 --- a/ydb/core/kafka_proxy/ya.make +++ b/ydb/core/kafka_proxy/ya.make @@ -15,8 +15,10 @@ SRCS( actors/kafka_read_session_actor.cpp actors/kafka_offset_fetch_actor.cpp actors/kafka_offset_commit_actor.cpp + actors/kafka_create_topics_actor.cpp kafka_connection.cpp kafka_connection.h + kafka_constants.h kafka_listener.h kafka.h kafka_log.h