Skip to content

Commit 14474cf

Browse files
authored
Merge 010be6b into 506ecae
2 parents 506ecae + 010be6b commit 14474cf

File tree

3 files changed

+15
-7
lines changed

3 files changed

+15
-7
lines changed

ydb/core/kafka_proxy/actors/actors.h

+5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010

1111
namespace NKafka {
1212

13+
static constexpr int ProxyNodeId = 1;
14+
static constexpr char UnderlayPrefix[] = "u-";
15+
16+
static_assert(sizeof(UnderlayPrefix) == 3);
17+
1318
enum EAuthSteps {
1419
WAIT_HANDSHAKE,
1520
WAIT_AUTH,

ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp

+10-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ void TKafkaFindCoordinatorActor::Bootstrap(const NActors::TActorContext& ctx) {
2525

2626
bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty();
2727
if (withProxy) {
28-
SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), -1, ctx);
28+
SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), NKafka::ProxyNodeId, ctx);
2929
return;
3030
}
3131

@@ -54,6 +54,8 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p
5454
response->Port = port;
5555
response->NodeId = nodeId;
5656

57+
KAFKA_LOG_D("FIND_COORDINATOR response. Host#: " << host << ", Port#: " << port << ", NodeId# " << nodeId);
58+
5759
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
5860
Die(ctx);
5961
}
@@ -71,15 +73,21 @@ void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, cons
7173

7274
response->Coordinators.push_back(coordinator);
7375
}
74-
76+
77+
response->ErrorCode = error;
78+
7579
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
7680
Die(ctx);
7781
}
7882

7983
void TKafkaFindCoordinatorActor::Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
8084
auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
8185
Y_ABORT_UNLESS(!iter.IsEnd());
86+
8287
auto host = (*ev->Get()->Nodes)[iter->second].Host;
88+
if (host.StartsWith(UnderlayPrefix)) {
89+
host = host.substr(sizeof(UnderlayPrefix) - 1);
90+
}
8391
KAFKA_LOG_D("FIND_COORDINATOR incoming TEvGetAllNodesInfoResponse. Host#: " << host);
8492
SendResponseOkAndDie(host, Context->Config.GetListeningPort(), ctx.SelfID.NodeId(), ctx);
8593
}

ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp

-5
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,6 @@
66
namespace NKafka {
77
using namespace NKikimr::NGRpcProxy::V1;
88

9-
static constexpr int ProxyNodeId = 1;
10-
static constexpr char UnderlayPrefix[] = "u-";
11-
12-
static_assert(sizeof(UnderlayPrefix) == 3);
13-
149
NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
1510
const ui64 correlationId,
1611
const TMessagePtr<TMetadataRequestData>& message) {

0 commit comments

Comments
 (0)