Skip to content

Commit 156cd63

Browse files
authored
Fix kafka metadata actor for Topics.size() == 0 and fix read by short topic name (#1287)
* Fix kafka metadata actor for Topics.size() == 0 and fix read by short topic name * Fixes
1 parent 2dc957d commit 156cd63

5 files changed

+71
-24
lines changed

ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,45 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
1919

2020
void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
2121
Response->Topics.resize(Message->Topics.size());
22+
Response->ClusterId = "ydb-cluster";
23+
Response->ControllerId = 1;
2224

25+
26+
if (WithProxy) {
27+
AddProxyNodeToBrokers();
28+
}
29+
30+
if (Message->Topics.size() == 0 && !WithProxy) {
31+
AddCurrentNodeToBrokers();
32+
}
33+
34+
if (Message->Topics.size() != 0) {
35+
ProcessTopics();
36+
}
37+
38+
Become(&TKafkaMetadataActor::StateWork);
39+
RespondIfRequired(ctx);
40+
}
41+
42+
void TKafkaMetadataActor::AddCurrentNodeToBrokers() {
43+
PendingResponses++;
44+
Send(NKikimr::NIcNodeCache::CreateICNodesInfoCacheServiceId(), new NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoRequest());
45+
}
46+
47+
void TKafkaMetadataActor::AddProxyNodeToBrokers() {
48+
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
49+
broker.NodeId = ProxyNodeId;
50+
broker.Host = Context->Config.GetProxy().GetHostname();
51+
broker.Port = Context->Config.GetProxy().GetPort();
52+
Response->Brokers.emplace_back(std::move(broker));
53+
}
54+
55+
void TKafkaMetadataActor::ProcessTopics() {
2356
THashMap<TString, TActorId> partitionActors;
2457
for (size_t i = 0; i < Message->Topics.size(); ++i) {
2558
Response->Topics[i] = TMetadataResponseData::TMetadataResponseTopic{};
2659
auto& reqTopic = Message->Topics[i];
2760
Response->Topics[i].Name = reqTopic.Name.value_or("");
28-
Response->ClusterId = "ydb-cluster";
29-
Response->ControllerId = 1;
3061

3162
if (!reqTopic.Name.value_or("")) {
3263
AddTopicError(Response->Topics[i], EKafkaErrors::INVALID_TOPIC_EXCEPTION);
@@ -43,8 +74,21 @@ void TKafkaMetadataActor::Bootstrap(const TActorContext& ctx) {
4374
}
4475
TopicIndexes[child].push_back(i);
4576
}
46-
Become(&TKafkaMetadataActor::StateWork);
77+
}
4778

79+
void TKafkaMetadataActor::HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
80+
auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
81+
Y_ABORT_UNLESS(!iter.IsEnd());
82+
auto host = (*ev->Get()->Nodes)[iter->second].Host;
83+
KAFKA_LOG_D("Incoming TEvGetAllNodesInfoResponse. Host#: " << host);
84+
85+
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
86+
broker.NodeId = ctx.SelfID.NodeId();
87+
broker.Host = host;
88+
broker.Port = Context->Config.GetListeningPort();
89+
Response->Brokers.emplace_back(std::move(broker));
90+
91+
--PendingResponses;
4892
RespondIfRequired(ctx);
4993
}
5094

@@ -69,21 +113,11 @@ void TKafkaMetadataActor::AddTopicError(
69113
}
70114

71115
void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) {
72-
bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty();
73-
74116
topic.ErrorCode = NONE_ERROR;
75-
//topic.TopicId = TKafkaUuid(response->SchemeShardId, response->PathId);
76-
if (withProxy) {
77-
auto broker = TMetadataResponseData::TMetadataResponseBroker{};
78-
broker.NodeId = ProxyNodeId;
79-
broker.Host = Context->Config.GetProxy().GetHostname();
80-
broker.Port = Context->Config.GetProxy().GetPort();
81-
Response->Brokers.emplace_back(std::move(broker));
82-
}
83117

84118
topic.Partitions.reserve(response->Partitions.size());
85119
for (const auto& part : response->Partitions) {
86-
auto nodeId = withProxy ? ProxyNodeId : part.NodeId;
120+
auto nodeId = WithProxy ? ProxyNodeId : part.NodeId;
87121

88122
TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition;
89123
responsePartition.PartitionIndex = part.PartitionId;
@@ -95,7 +129,7 @@ void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataRespo
95129

96130
topic.Partitions.emplace_back(std::move(responsePartition));
97131

98-
if (!withProxy) {
132+
if (!WithProxy) {
99133
auto ins = AllClusterNodes.insert(part.NodeId);
100134
if (ins.second) {
101135
auto hostname = part.Hostname;
@@ -123,12 +157,12 @@ void TKafkaMetadataActor::HandleResponse(TEvLocationResponse::TPtr ev, const TAc
123157
Y_DEBUG_ABORT_UNLESS(!actorIter->second.empty());
124158

125159
if (actorIter.IsEnd()) {
126-
KAFKA_LOG_CRIT("Metadata actor: got unexpected location response, ignoring. Expect malformed/incompled reply");
160+
KAFKA_LOG_CRIT("Got unexpected location response, ignoring. Expect malformed/incompled reply");
127161
return RespondIfRequired(ctx);
128162
}
129163

130164
if (actorIter->second.empty()) {
131-
KAFKA_LOG_CRIT("Metadata actor: corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply");
165+
KAFKA_LOG_CRIT("Corrupted state (empty actorId in mapping). Ignored location response, expect incomplete reply");
132166

133167
return RespondIfRequired(ctx);
134168
}

ydb/core/kafka_proxy/actors/kafka_metadata_actor.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#include "actors.h"
22

3+
#include <ydb/core/kafka_proxy/kafka_events.h>
34
#include <ydb/library/actors/core/actor_bootstrapped.h>
45
#include <ydb/library/aclib/aclib.h>
56
#include <ydb/services/persqueue_v1/actors/events.h>
7+
#include <ydb/services/persqueue_v1/actors/schema_actors.h>
68

79
namespace NKafka {
810

@@ -12,6 +14,7 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
1214
: Context(context)
1315
, CorrelationId(correlationId)
1416
, Message(message)
17+
, WithProxy(context->Config.HasProxy() && !context->Config.GetProxy().GetHostname().Empty())
1518
, Response(new TMetadataResponseData())
1619
{}
1720

@@ -22,14 +25,19 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
2225

2326
TActorId SendTopicRequest(const TMetadataRequestData::TMetadataRequestTopic& topicRequest);
2427
void HandleResponse(TEvLocationResponse::TPtr ev, const NActors::TActorContext& ctx);
28+
void HandleNodesResponse(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx);
2529

2630
void AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response);
2731
void AddTopicError(TMetadataResponseData::TMetadataResponseTopic& topic, EKafkaErrors errorCode);
2832
void RespondIfRequired(const NActors::TActorContext& ctx);
33+
void AddProxyNodeToBrokers();
34+
void AddCurrentNodeToBrokers();
35+
void ProcessTopics();
2936

3037
STATEFN(StateWork) {
3138
switch (ev->GetTypeRewrite()) {
3239
HFunc(TEvLocationResponse, HandleResponse);
40+
HFunc(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, HandleNodesResponse);
3341
}
3442
}
3543

@@ -39,6 +47,7 @@ class TKafkaMetadataActor: public NActors::TActorBootstrapped<TKafkaMetadataActo
3947
const TContext::TPtr Context;
4048
const ui64 CorrelationId;
4149
const TMessagePtr<TMetadataRequestData> Message;
50+
const bool WithProxy;
4251

4352
ui64 PendingResponses = 0;
4453

ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ void TKafkaOffsetCommitActor::Handle(NGRpcProxy::V1::TEvPQProxy::TEvAuthResultOk
130130
void TKafkaOffsetCommitActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
131131
const auto& partitionResult = ev->Get()->Record.GetPartitionResponse();
132132
auto requestInfo = CookieToRequestInfo.find(partitionResult.GetCookie());
133-
requestInfo->second.Done = true;
134-
135133
Y_ABORT_UNLESS(requestInfo != CookieToRequestInfo.end());
134+
135+
requestInfo->second.Done = true;
136136
if (ev->Get()->Record.GetErrorCode() != NPersQueue::NErrorCode::OK) {
137137
KAFKA_LOG_CRIT("Commit offset error. status# " << EErrorCode_Name(ev->Get()->Record.GetErrorCode()) << ", reason# " << ev->Get()->Record.GetErrorReason());
138138
}

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ bool TKafkaReadSessionActor::CheckHeartbeatIsExpired() {
319319
bool TKafkaReadSessionActor::TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData, THashSet<TString>& topics) {
320320
auto supportedProtocolFound = false;
321321
for (auto protocol: joinGroupRequestData->Protocols) {
322+
KAFKA_LOG_D("JOIN_GROUP assign protocol supported by client: " << protocol.Name);
322323
if (protocol.Name == SUPPORTED_ASSIGN_STRATEGY) {
323324
FillTopicsFromJoinGroupMetadata(protocol.Metadata, topics);
324325
supportedProtocolFound = true;
@@ -339,7 +340,7 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala
339340
THashSet<ui64> finalPartitionsToRead;
340341

341342
TConsumerProtocolAssignment::TopicPartition topicPartition;
342-
topicPartition.Topic = topicName;
343+
topicPartition.Topic = OriginalTopicNames[topicName];
343344
for (auto part: partitions.ToLock) {
344345
finalPartitionsToRead.emplace(part);
345346
}
@@ -379,7 +380,9 @@ void TKafkaReadSessionActor::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metada
379380

380381
for (auto topic: result.Topics) {
381382
if (topic.has_value()) {
382-
topics.emplace(NormalizePath(Context->DatabasePath, topic.value()));
383+
auto normalizedTopicName = NormalizePath(Context->DatabasePath, topic.value());
384+
OriginalTopicNames[normalizedTopicName] = topic.value();
385+
topics.emplace(normalizedTopicName);
383386
KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic);
384387
}
385388
}
@@ -536,9 +539,9 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition:
536539
return;
537540
}
538541

539-
const auto name = converterIter->second->GetInternalName();
542+
const auto topicName = converterIter->second->GetInternalName();
540543

541-
auto topicInfoIt = TopicsInfo.find(name);
544+
auto topicInfoIt = TopicsInfo.find(topicName);
542545
if (topicInfoIt == TopicsInfo.end() || (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient()))) {
543546
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
544547
<< ", partition# " << record.GetPartition()
@@ -549,7 +552,7 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition:
549552
TNewPartitionToLockInfo partitionToLock;
550553
partitionToLock.LockOn = ctx.Now() + LOCK_PARTITION_DELAY;
551554
partitionToLock.PartitionId = record.GetPartition();
552-
NewPartitionsToLockOnTime[name].push_back(partitionToLock);
555+
NewPartitionsToLockOnTime[topicName].push_back(partitionToLock);
553556
}
554557

555558
void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx) {

ydb/core/kafka_proxy/actors/kafka_read_session_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ struct TNextRequestError {
174174
THashMap<TString, NGRpcProxy::TTopicHolder> TopicsInfo; // topic -> info
175175
NPersQueue::TTopicsToConverter TopicsToConverter;
176176
THashSet<TString> TopicsToReadNames;
177+
THashMap<TString, TString> OriginalTopicNames;
177178
THashMap<TString, TPartitionsInfo> TopicPartitions;
178179
THashMap<TString, NPersQueue::TTopicConverterPtr> FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching
179180
THashMap<TString, TVector<TNewPartitionToLockInfo>> NewPartitionsToLockOnTime; // Topic -> PartitionsToLock

0 commit comments

Comments
 (0)