Skip to content

Commit 401b07f

Browse files
committed
work
1 parent 5745078 commit 401b07f

File tree

3 files changed

+74
-83
lines changed

3 files changed

+74
-83
lines changed

ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp

+62-80
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,6 @@ namespace NKafka {
66
using namespace NKikimr;
77
using namespace NKikimr::NGRpcProxy::V1;
88

9-
static EKafkaErrors KqpStatusToKafkaError(Ydb::StatusIds::StatusCode status) {
10-
// savnik finish it
11-
if (status == Ydb::StatusIds::SUCCESS) {
12-
return EKafkaErrors::NONE_ERROR;
13-
}
14-
return EKafkaErrors::UNKNOWN_SERVER_ERROR;
15-
}
16-
179
static constexpr ui8 MASTER_WAIT_JOINS_DELAY_SECONDS = 5;
1810
static constexpr ui8 WAIT_FOR_MASTER_DELAY_SECONDS = 2;
1911
static constexpr ui8 WAIT_MASTER_MAX_RETRY_COUNT = 5;
@@ -29,6 +21,20 @@ void TKafkaBalancerActor::Bootstrap(const NActors::TActorContext& ctx) {
2921
}
3022
}
3123

24+
static EKafkaErrors KqpStatusToKafkaError(Ydb::StatusIds::StatusCode status) {
25+
// savnik finish it
26+
if (status == Ydb::StatusIds::SUCCESS) {
27+
return EKafkaErrors::NONE_ERROR;
28+
}
29+
return EKafkaErrors::UNKNOWN_SERVER_ERROR;
30+
}
31+
32+
TString TKafkaBalancerActor::LogPrefix() {
33+
TStringBuilder sb;
34+
sb << "TKafkaBalancerActor: ";
35+
return sb;
36+
}
37+
3238
void TKafkaBalancerActor::Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) {
3339
TablesInited++;
3440
if (TablesInited == TABLES_TO_INIT_COUNT) {
@@ -37,68 +43,28 @@ void TKafkaBalancerActor::Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&
3743
}
3844

3945
void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
40-
Cookie = 0;
41-
const TString createSessionError = "Failed to create KQP session";
46+
Cookie = 0; // savnik
47+
4248
if (!Kqp->HandleCreateSessionResponse(ev, ctx)) {
43-
switch (RequestType) {
44-
case JOIN_GROUP:
45-
SendJoinGroupResponseFail(ctx, CorrelationId,
46-
EKafkaErrors::UNKNOWN_SERVER_ERROR,
47-
createSessionError);
48-
break;
49-
case SYNC_GROUP:
50-
SendSyncGroupResponseFail(ctx, CorrelationId,
51-
EKafkaErrors::UNKNOWN_SERVER_ERROR,
52-
createSessionError);
53-
break;
54-
case LEAVE_GROUP:
55-
SendLeaveGroupResponseFail(ctx, CorrelationId,
56-
EKafkaErrors::UNKNOWN_SERVER_ERROR,
57-
createSessionError);
58-
break;
59-
case HEARTBEAT:
60-
SendHeartbeatResponseFail(ctx, CorrelationId,
61-
EKafkaErrors::UNKNOWN_SERVER_ERROR,
62-
createSessionError);
63-
break;
64-
default:
65-
break;
66-
}
49+
SendResponseFail(ctx, EKafkaErrors::UNKNOWN_SERVER_ERROR, "Failed to create KQP session");
6750
PassAway();
6851
return;
6952
}
7053

71-
switch (RequestType) {
72-
case JOIN_GROUP:
73-
HandleJoinGroupResponse(nullptr, ctx);
74-
break;
75-
case SYNC_GROUP:
76-
HandleSyncGroupResponse(nullptr, ctx);
77-
break;
78-
case LEAVE_GROUP:
79-
HandleLeaveGroupResponse(nullptr, ctx);
80-
break;
81-
case HEARTBEAT:
82-
HandleHeartbeatResponse(nullptr, ctx);
83-
break;
84-
default:
85-
KAFKA_LOG_CRIT("Unknown RequestType in TEvCreateSessionResponse");
86-
PassAway();
87-
break;
88-
}
54+
HandleResponse(nullptr, ctx);
8955
}
9056

9157
void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
92-
const TString kqpQueryError = "KQP query error";
9358
if (ev->Cookie != KqpReqCookie) {
94-
KAFKA_LOG_CRIT("Unexpected cookie in TEvQueryResponse");
59+
KAFKA_LOG_ERROR("Unexpected cookie in TEvQueryResponse");
9560
return;
9661
}
9762

9863
const auto& record = ev->Get()->Record;
9964
auto status = record.GetYdbStatus();
10065
if (status == ::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_ABORTED && CurrentRetryNumber < TX_ABORT_MAX_RETRY_COUNT) {
10166
CurrentRetryNumber++;
67+
KAFKA_LOG_I(TStringBuilder() << "Retry after tx aborted. Num of retry# " << CurrentRetryNumber);
10268
switch (RequestType) {
10369
case JOIN_GROUP:
10470
Register(new TKafkaBalancerActor(Context, Cookie, CorrelationId, JoinGroupRequestData, CurrentRetryNumber));
@@ -123,26 +89,21 @@ void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const
12389
auto kafkaErr = KqpStatusToKafkaError(status);
12490

12591
if (kafkaErr != EKafkaErrors::NONE_ERROR) {
126-
switch (RequestType) {
127-
case JOIN_GROUP:
128-
SendJoinGroupResponseFail(ctx, CorrelationId, kafkaErr, kqpQueryError);
129-
break;
130-
case SYNC_GROUP:
131-
SendSyncGroupResponseFail(ctx, CorrelationId, kafkaErr, kqpQueryError);
132-
break;
133-
case LEAVE_GROUP:
134-
SendLeaveGroupResponseFail(ctx, CorrelationId, kafkaErr, kqpQueryError);
135-
break;
136-
case HEARTBEAT:
137-
SendHeartbeatResponseFail(ctx, CorrelationId, kafkaErr, kqpQueryError);
138-
break;
139-
default:
140-
break;
141-
}
142-
PassAway();
143-
return;
92+
auto kqpQueryError = TStringBuilder() <<" Kqp error. ";
93+
94+
NYql::TIssues issues;
95+
NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
96+
NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues));
97+
kqpQueryError << status;
98+
99+
SendResponseFail(ctx, kafkaErr, kqpQueryError);
144100
}
145101

102+
HandleResponse(ev, ctx);
103+
}
104+
105+
void TKafkaBalancerActor::HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, const TActorContext& ctx) {
106+
KAFKA_LOG_I(TStringBuilder() << "Handle kqp response. CurrentStep# " << (ui8)CurrentStep);
146107
switch (RequestType) {
147108
case JOIN_GROUP:
148109
HandleJoinGroupResponse(ev, ctx);
@@ -157,12 +118,31 @@ void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const
157118
HandleHeartbeatResponse(ev, ctx);
158119
break;
159120
default:
160-
KAFKA_LOG_CRIT("Unknown RequestType in TEvCreateSessionResponse");
121+
KAFKA_LOG_ERROR("Unknown RequestType in TEvCreateSessionResponse");
161122
PassAway();
162123
break;
163124
}
164125
}
165126

127+
void TKafkaBalancerActor::SendResponseFail(const TActorContext& ctx, EKafkaErrors error, const TString& message) {
128+
switch (RequestType) {
129+
case JOIN_GROUP:
130+
SendJoinGroupResponseFail(ctx, CorrelationId, error, message);
131+
break;
132+
case SYNC_GROUP:
133+
SendSyncGroupResponseFail(ctx, CorrelationId, error, message);
134+
break;
135+
case LEAVE_GROUP:
136+
SendLeaveGroupResponseFail(ctx, CorrelationId, error, message);
137+
break;
138+
case HEARTBEAT:
139+
SendHeartbeatResponseFail(ctx, CorrelationId, error, message);
140+
break;
141+
default:
142+
break;
143+
}
144+
}
145+
166146
std::optional<TGroupStatus> TKafkaBalancerActor::ParseCheckStateAndGeneration(
167147
NKqp::TEvKqp::TEvQueryResponse::TPtr ev
168148
) {
@@ -302,8 +282,8 @@ bool TKafkaBalancerActor::ParseWorkerStatesAndChooseProtocol(
302282
}
303283

304284
for (const auto& st : states) {
305-
const auto& protos = st.WorkerState.protocols();
306-
for (const auto& pr : protos) {
285+
const auto& protocols = st.WorkerState.protocols();
286+
for (const auto& pr : protocols) {
307287
if (pr.protocol_name() == chosenProtocol) {
308288
workerStates[st.MemberId] = pr.metadata();
309289
break;
@@ -342,7 +322,7 @@ bool TKafkaBalancerActor::ParseDeadCount(
342322
}
343323

344324
void TKafkaBalancerActor::Die(const TActorContext& ctx) {
345-
KAFKA_LOG_D("TKafkaBalancerActor pass away");
325+
KAFKA_LOG_D("Pass away");
346326
TBase::Die(ctx);
347327
}
348328

@@ -400,6 +380,8 @@ NYdb::TParamsBuilder TKafkaBalancerActor::BuildAssignmentsParams() {
400380

401381
auto& assignmentList = params.AddParam("$Assignments").BeginList();
402382

383+
KAFKA_LOG_D(TStringBuilder() << "Assignments count: " << SyncGroupRequestData->Assignments.size());
384+
403385
for (auto& assignment: SyncGroupRequestData->Assignments) {
404386
assignmentList.AddListItem()
405387
.BeginStruct()
@@ -1117,7 +1099,7 @@ void TKafkaBalancerActor::SendJoinGroupResponseFail(const TActorContext&,
11171099
EKafkaErrors error,
11181100
TString message) {
11191101

1120-
KAFKA_LOG_CRIT("JOIN_GROUP failed. reason# " << message);
1102+
KAFKA_LOG_ERROR("JOIN_GROUP failed. reason# " << message);
11211103
auto response = std::make_shared<TJoinGroupResponseData>();
11221104
response->ErrorCode = error;
11231105
Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));
@@ -1127,7 +1109,7 @@ void TKafkaBalancerActor::SendSyncGroupResponseFail(const TActorContext&,
11271109
ui64 corellationId,
11281110
EKafkaErrors error,
11291111
TString message) {
1130-
KAFKA_LOG_CRIT("SYNC_GROUP failed. reason# " << message);
1112+
KAFKA_LOG_ERROR("SYNC_GROUP failed. reason# " << message);
11311113
auto response = std::make_shared<TSyncGroupResponseData>();
11321114
response->ErrorCode = error;
11331115
response->Assignment = "";
@@ -1138,7 +1120,7 @@ void TKafkaBalancerActor::SendLeaveGroupResponseFail(const TActorContext&,
11381120
ui64 corellationId,
11391121
EKafkaErrors error,
11401122
TString message) {
1141-
KAFKA_LOG_CRIT("LEAVE_GROUP failed. reason# " << message);
1123+
KAFKA_LOG_ERROR("LEAVE_GROUP failed. reason# " << message);
11421124
auto response = std::make_shared<TLeaveGroupResponseData>();
11431125
response->ErrorCode = error;
11441126
Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));
@@ -1148,7 +1130,7 @@ void TKafkaBalancerActor::SendHeartbeatResponseFail(const TActorContext&,
11481130
ui64 corellationId,
11491131
EKafkaErrors error,
11501132
TString message) {
1151-
KAFKA_LOG_CRIT("HEARTBEAT failed. reason# " << message);
1133+
KAFKA_LOG_ERROR("HEARTBEAT failed. reason# " << message);
11521134
auto response = std::make_shared<THeartbeatResponseData>();
11531135
response->ErrorCode = error;
11541136
Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));

ydb/core/kafka_proxy/actors/kafka_balancer_actor.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
4848
public:
4949
using TBase = NActors::TActorBootstrapped<TKafkaBalancerActor>;
5050

51-
enum EBalancerStep : ui32 {
51+
enum EBalancerStep : ui8 {
5252
STEP_NONE = 0,
5353

5454
JOIN_TX0_0_BEGIN_TX,
@@ -212,6 +212,8 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
212212
}
213213
}
214214

215+
void HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, const TActorContext& ctx);
216+
215217
void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx);
216218
void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx);
217219
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
@@ -237,6 +239,9 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
237239
void SendLeaveGroupResponseFail(const TActorContext&, ui64 corellationId,
238240
EKafkaErrors error, TString message = "");
239241

242+
TString LogPrefix();
243+
void SendResponseFail(const TActorContext& ctx, EKafkaErrors error, const TString& message);
244+
240245
std::optional<TGroupStatus> ParseCheckStateAndGeneration(NKqp::TEvKqp::TEvQueryResponse::TPtr ev);
241246
bool ParseAssignments(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, TString& assignments);
242247
bool ParseWorkerStatesAndChooseProtocol(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map<TString, TString>& workerStates, TString& chosenProtocol);

ydb/core/kafka_proxy/ut/ut_protocol.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class TTestServer {
7575
public:
7676
TIpPort Port;
7777

78-
TTestServer(const TString& kafkaApiMode = "1") {
78+
TTestServer(const TString& kafkaApiMode = "1", bool enableNativeKafkaBalancing = false) {
7979
TPortManager portManager;
8080
Port = portManager.GetTcpPort();
8181

@@ -107,6 +107,10 @@ class TTestServer {
107107
appConfig.MutableKafkaProxyConfig()->SetMaxMessageSize(1024);
108108
appConfig.MutableKafkaProxyConfig()->SetMaxInflightSize(2048);
109109

110+
if (enableNativeKafkaBalancing) {
111+
appConfig.MutableKafkaProxyConfig()->SetEnableNativeBalancing(true);
112+
}
113+
110114
appConfig.MutablePQConfig()->MutableQuotingConfig()->SetEnableQuoting(true);
111115
appConfig.MutablePQConfig()->MutableQuotingConfig()->SetQuotaWaitDurationMs(300);
112116
appConfig.MutablePQConfig()->MutableQuotingConfig()->SetPartitionReadQuotaIsTwiceWriteQuota(true);
@@ -2541,7 +2545,7 @@ Y_UNIT_TEST(ProduceScenario) {
25412545
}
25422546

25432547
Y_UNIT_TEST(NativeKafkaBalanceScenario) {
2544-
TInsecureTestServer testServer("1");
2548+
TInsecureTestServer testServer("1", true);
25452549

25462550
TString topicName = "/Root/topic-0";
25472551
ui64 totalPartitions = 24;

0 commit comments

Comments
 (0)