Skip to content

Fix kafka metadata actor for Topics.size() == 0 and fix read by short topic name #1287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 51 additions & 17 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,45 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,

void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
Response->Topics.resize(Message->Topics.size());
Response->ClusterId = "ydb-cluster";
Response->ControllerId = 1;


if (WithProxy) {
AddProxyNodeToBrokers();
}

if (Message->Topics.size() == 0 && !WithProxy) {
AddCurrentNodeToBrokers();
}

if (Message->Topics.size() != 0) {
ProcessTopics();
}

Become(&TKafkaMetadataActor::StateWork);
RespondIfRequired(ctx);
}

void TKafkaMetadataActor::AddCurrentNodeToBrokers() {
PendingResponses++;
Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest());
}

void TKafkaMetadataActor::AddProxyNodeToBrokers() {
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = ProxyNodeId;
broker.Host = Context->Config.GetProxy().GetHostname();
broker.Port = Context->Config.GetProxy().GetPort();
Response->Brokers.emplace_back(std::move(broker));
}

void TKafkaMetadataActor::ProcessTopics() {
THashMap<TString, TActorId> partitionActors;
for (size_t i = 0; i < Message->Topics.size(); ++i) {
Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{};
auto& reqTopic = Message->Topics[i];
Response->Topics[i].Name = reqTopic.Name.value_or("");
Response->ClusterId = "ydb-cluster";
Response->ControllerId = 1;

if (!reqTopic.Name.value_or("")) {
AddTopicError(Response->Topics[i], EKafkaErrors::INVALID_TOPIC_EXCEPTION);
Expand All @@ -43,8 +74,21 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
}
TopicIndexes[child].push_back(i);
}
Become(&TKafkaMetadataActor::StateWork);
}

void TKafkaMetadataActor::HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
Y_ABORT_UNLESS(!iter.IsEnd());
auto host = (*ev->Get()->Nodes)[iter->second].Host;
KAFKA_LOG_D("Incoming TEvGetAllNodesInfoResponse. Host#: " << host);

auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = ctx.SelfID.NodeId();
broker.Host = host;
broker.Port = Context->Config.GetListeningPort();
Response->Brokers.emplace_back(std::move(broker));

--PendingResponses;
RespondIfRequired(ctx);
}

Expand All @@ -69,21 +113,11 @@ void TKafkaMetadataActor::AddTopicError(
}

void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) {
bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty();

topic.ErrorCode = NONE_ERROR;
//topic.TopicId = TKafkaUuid(response->SchemeShardId, response->PathId);
if (withProxy) {
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
broker.NodeId = ProxyNodeId;
broker.Host = Context->Config.GetProxy().GetHostname();
broker.Port = Context->Config.GetProxy().GetPort();
Response->Brokers.emplace_back(std::move(broker));
}

topic.Partitions.reserve(response->Partitions.size());
for (const auto& part : response->Partitions) {
auto nodeId = withProxy ? ProxyNodeId : part.NodeId;
auto nodeId = WithProxy ? ProxyNodeId : part.NodeId;

TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition;
responsePartition.PartitionIndex = part.PartitionId;
Expand All @@ -95,7 +129,7 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo

topic.Partitions.emplace_back(std::move(responsePartition));

if (!withProxy) {
if (!WithProxy) {
auto ins = AllClusterNodes.insert(part.NodeId);
if (ins.second) {
auto hostname = part.Hostname;
Expand Down Expand Up @@ -123,12 +157,12 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc
Y_DEBUG_ABORT_UNLESS(!actorIter->second.empty());

if (actorIter.IsEnd()) {
KAFKA_LOG_CRIT("Metadata actor: got unexpected location response, ignoring. Expect malformed/incompled reply");
KAFKA_LOG_CRIT("Got unexpected location response, ignoring. Expect malformed/incompled reply");
return RespondIfRequired(ctx);
}

if (actorIter->second.empty()) {
KAFKA_LOG_CRIT("Metadata actor: corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply");
KAFKA_LOG_CRIT("Corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply");

return RespondIfRequired(ctx);
}
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "actors.h"

#include <ydb/core/kafka_proxy/kafka_events.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/aclib/aclib.h>
#include <ydb/services/persqueue_v1/actors/events.h>
#include <ydb/services/persqueue_v1/actors/schema_actors.h>

namespace NKafka {

Expand All @@ -12,6 +14,7 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
: Context(context)
, CorrelationId(correlationId)
, Message(message)
, WithProxy(context->Config.HasProxy() && !context->Config.GetProxy().GetHostname().Empty())
, Response(new TMetadataResponseData())
{}

Expand All @@ -22,14 +25,19 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo

TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest);
void HandleResponse(TEvLocationResponse::TPtr ev, const NActors::TActorContext& ctx);
void HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx);

void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response);
void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode);
void RespondIfRequired(const NActors::TActorContext& ctx);
void AddProxyNodeToBrokers();
void AddCurrentNodeToBrokers();
void ProcessTopics();

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvLocationResponse, HandleResponse);
HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, HandleNodesResponse);
}
}

Expand All @@ -39,6 +47,7 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
const TContext::TPtr Context;
const ui64 CorrelationId;
const TMessagePtr<TMetadataRequestData> Message;
const bool WithProxy;

ui64 PendingResponses = 0;

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ void TKafkaOffsetCommitActor::Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk
void TKafkaOffsetCommitActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
const auto& partitionResult = ev->Get()->Record.GetPartitionResponse();
auto requestInfo = CookieToRequestInfo.find(partitionResult.GetCookie());
requestInfo->second.Done = true;

Y_ABORT_UNLESS(requestInfo != CookieToRequestInfo.end());

requestInfo->second.Done = true;
if (ev->Get()->Record.GetErrorCode() != NPersQueue::NErrorCode::OK) {
KAFKA_LOG_CRIT("Commit offset error. status# " << EErrorCode_Name(ev->Get()->Record.GetErrorCode()) << ", reason# " << ev->Get()->Record.GetErrorReason());
}
Expand Down
13 changes: 8 additions & 5 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ bool TKafkaReadSessionActor::CheckHeartbeatIsExpired() {
bool TKafkaReadSessionActor::TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData, THashSet<TString>& topics) {
auto supportedProtocolFound = false;
for (auto protocol: joinGroupRequestData->Protocols) {
KAFKA_LOG_D("JOIN_GROUP assign protocol supported by client: " << protocol.Name);
if (protocol.Name == SUPPORTED_ASSIGN_STRATEGY) {
FillTopicsFromJoinGroupMetadata(protocol.Metadata, topics);
supportedProtocolFound = true;
Expand All @@ -339,7 +340,7 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala
THashSet<ui64> finalPartitionsToRead;

TConsumerProtocolAssignment::TopicPartition topicPartition;
topicPartition.Topic = topicName;
topicPartition.Topic = OriginalTopicNames[topicName];
for (auto part: partitions.ToLock) {
finalPartitionsToRead.emplace(part);
}
Expand Down Expand Up @@ -379,7 +380,9 @@ void TKafkaReadSessionActor::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metada

for (auto topic: result.Topics) {
if (topic.has_value()) {
topics.emplace(NormalizePath(Context->DatabasePath, topic.value()));
auto normalizedTopicName = NormalizePath(Context->DatabasePath, topic.value());
OriginalTopicNames[normalizedTopicName] = topic.value();
topics.emplace(normalizedTopicName);
KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic);
}
}
Expand Down Expand Up @@ -536,9 +539,9 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition:
return;
}

const auto name = converterIter->second->GetInternalName();
const auto topicName = converterIter->second->GetInternalName();

auto topicInfoIt = TopicsInfo.find(name);
auto topicInfoIt = TopicsInfo.find(topicName);
if (topicInfoIt == TopicsInfo.end() || (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient()))) {
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
<< ", partition# " << record.GetPartition()
Expand All @@ -549,7 +552,7 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition:
TNewPartitionToLockInfo partitionToLock;
partitionToLock.LockOn = ctx.Now() + LOCK_PARTITION_DELAY;
partitionToLock.PartitionId = record.GetPartition();
NewPartitionsToLockOnTime[name].push_back(partitionToLock);
NewPartitionsToLockOnTime[topicName].push_back(partitionToLock);
}

void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ struct TNextRequestError {
THashMap<TString, NGRpcProxy::TTopicHolder> TopicsInfo; // topic -> info
NPersQueue::TTopicsToConverter TopicsToConverter;
THashSet<TString> TopicsToReadNames;
THashMap<TString, TString> OriginalTopicNames;
THashMap<TString, TPartitionsInfo> TopicPartitions;
THashMap<TString, NPersQueue::TTopicConverterPtr> FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching
THashMap<TString, TVector<TNewPartitionToLockInfo>> NewPartitionsToLockOnTime; // Topic -> PartitionsToLock
Expand Down