Skip to content

Commit b306d7a

Browse files
committed
improve rebalances
1 parent 58d4f41 commit b306d7a

5 files changed

+288
-109
lines changed

ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp

+44-23
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@ const TString INSERT_NEW_GROUP = R"sql(
99
DECLARE $State AS Uint64;
1010
DECLARE $Database AS Utf8;
1111
DECLARE $Master AS Utf8;
12-
DECLARE $LastHeartbeat AS Datetime;
12+
DECLARE $LastMasterHeartbeat AS Datetime;
1313
DECLARE $ProtocolType AS Utf8;
14-
DECLARE $RebalanceTimeoutMs AS Uint32;
1514
1615
INSERT INTO `%s`
1716
(
@@ -21,19 +20,17 @@ const TString INSERT_NEW_GROUP = R"sql(
2120
database,
2221
last_heartbeat_time,
2322
master,
24-
protocol_type,
25-
rebalance_timeout_ms
23+
protocol_type
2624
)
2725
VALUES
2826
(
2927
$ConsumerGroup,
3028
$Generation,
3129
$State,
3230
$Database,
33-
$LastHeartbeat,
31+
$LastMasterHeartbeat,
3432
$Master,
35-
$ProtocolType,
36-
$RebalanceTimeoutMs
33+
$ProtocolType
3734
);
3835
)sql";
3936

@@ -44,16 +41,14 @@ const TString UPDATE_GROUP = R"sql(
4441
DECLARE $Generation AS Uint64;
4542
DECLARE $Database AS Utf8;
4643
DECLARE $Master AS Utf8;
47-
DECLARE $LastHeartbeat AS Datetime;
48-
DECLARE $RebalanceTimeoutMs AS Uint32;
44+
DECLARE $LastMasterHeartbeat AS Datetime;
4945
5046
UPDATE `%s`
5147
SET
5248
state = $State,
5349
generation = $Generation,
54-
last_heartbeat_time = $LastHeartbeat,
55-
master = $Master,
56-
rebalance_timeout_ms = $RebalanceTimeoutMs
50+
last_heartbeat_time = $LastMasterHeartbeat,
51+
master = $Master
5752
WHERE database = $Database
5853
AND consumer_group = $ConsumerGroup;
5954
)sql";
@@ -77,12 +72,12 @@ const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
7772
DECLARE $State AS Uint64;
7873
DECLARE $Database AS Utf8;
7974
DECLARE $Protocol AS Utf8;
80-
DECLARE $LastHeartbeat AS Datetime;
75+
DECLARE $LastMasterHeartbeat AS Datetime;
8176
8277
UPDATE `%s`
8378
SET
8479
state = $State,
85-
last_heartbeat_time = $LastHeartbeat,
80+
last_heartbeat_time = $LastMasterHeartbeat,
8681
protocol = $Protocol
8782
WHERE database = $Database
8883
AND consumer_group = $ConsumerGroup;
@@ -93,6 +88,7 @@ const TString INSERT_MEMBER = R"sql(
9388
DECLARE $ConsumerGroup AS Utf8;
9489
DECLARE $Generation AS Uint64;
9590
DECLARE $MemberId AS Utf8;
91+
DECLARE $InstanceId AS Utf8;
9692
DECLARE $WorkerStateProto AS String;
9793
DECLARE $Database AS Utf8;
9894
DECLARE $HeartbeatDeadline AS Datetime;
@@ -103,6 +99,7 @@ const TString INSERT_MEMBER = R"sql(
10399
consumer_group,
104100
generation,
105101
member_id,
102+
instance_id,
106103
heartbeat_deadline,
107104
worker_state_proto,
108105
database,
@@ -112,6 +109,7 @@ const TString INSERT_MEMBER = R"sql(
112109
$ConsumerGroup,
113110
$Generation,
114111
$MemberId,
112+
$InstanceId,
115113
$HeartbeatDeadline,
116114
$WorkerStateProto,
117115
$Database,
@@ -126,7 +124,7 @@ const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"sql(
126124
DECLARE $Database AS Utf8;
127125
DECLARE $Generation AS Uint64;
128126
DECLARE $State AS Uint64;
129-
DECLARE $LastHeartbeat AS Datetime;
127+
DECLARE $LastMasterHeartbeat AS Datetime;
130128
131129
UPSERT INTO `%s`
132130
SELECT
@@ -140,11 +138,32 @@ const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"sql(
140138
UPDATE `%s`
141139
SET
142140
state = $State,
143-
last_heartbeat_time = $LastHeartbeat
141+
last_heartbeat_time = $LastMasterHeartbeat,
142+
last_success_generation = $Generation
144143
WHERE database = $Database
145144
AND consumer_group = $ConsumerGroup;
146145
)sql";
147146

147+
const TString SELECT_ALIVE_MEMBERS = R"sql(
148+
--!syntax_v1
149+
DECLARE $ConsumerGroup AS Utf8;
150+
DECLARE $Generation AS Uint64;
151+
DECLARE $Database AS Utf8;
152+
DECLARE $PaginationMemberId AS Utf8;
153+
DECLARE $Limit AS Uint64;
154+
155+
SELECT member_id, instance_id, rebalance_timeout_ms
156+
FROM `%s`
157+
VIEW PRIMARY KEY
158+
WHERE database = $Database
159+
AND consumer_group = $ConsumerGroup
160+
AND generation = $Generation
161+
AND member_id > $PaginationMemberId
162+
AND (leaved IS NULL OR leaved = False)
163+
ORDER BY member_id
164+
LIMIT $Limit;
165+
)sql";
166+
148167
const TString SELECT_WORKER_STATES = R"sql(
149168
--!syntax_v1
150169
DECLARE $ConsumerGroup AS Utf8;
@@ -153,7 +172,7 @@ const TString SELECT_WORKER_STATES = R"sql(
153172
DECLARE $PaginationMemberId AS Utf8;
154173
DECLARE $Limit AS Uint64;
155174
156-
SELECT worker_state_proto, member_id
175+
SELECT worker_state_proto, member_id, instance_id
157176
FROM `%s`
158177
VIEW PRIMARY KEY
159178
WHERE database = $Database
@@ -169,7 +188,7 @@ const TString CHECK_GROUP_STATE = R"sql(
169188
DECLARE $ConsumerGroup AS Utf8;
170189
DECLARE $Database AS Utf8;
171190
172-
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type, rebalance_timeout_ms
191+
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type, last_success_generation
173192
FROM `%s`
174193
VIEW PRIMARY KEY
175194
WHERE database = $Database
@@ -218,20 +237,21 @@ const TString CHECK_DEAD_MEMBERS = R"sql(
218237
219238
)sql";
220239

221-
const TString UPDATE_LASTHEARTBEATS = R"sql(
240+
const TString UPDATE_LAST_HEARTBEATS = R"sql(
222241
--!syntax_v1
223242
DECLARE $ConsumerGroup AS Utf8;
224243
DECLARE $Generation AS Uint64;
225244
DECLARE $MemberId AS Utf8;
226245
DECLARE $Database AS Utf8;
227-
DECLARE $LastHeartbeat AS Datetime;
246+
DECLARE $LastMasterHeartbeat AS Datetime;
228247
DECLARE $HeartbeatDeadline AS Datetime;
229248
DECLARE $UpdateGroupHeartbeat AS Bool;
230249
231250
UPDATE `%s`
232-
SET last_heartbeat_time = $LastHeartbeat
251+
SET last_heartbeat_time = $LastMasterHeartbeat
233252
WHERE consumer_group = $ConsumerGroup
234253
AND database = $Database
254+
AND generation = $Generation
235255
AND $UpdateGroupHeartbeat = True;
236256
237257
UPDATE `%s`
@@ -248,10 +268,11 @@ const TString UPDATE_LASTHEARTBEAT_TO_LEAVE_GROUP = R"sql(
248268
DECLARE $ConsumerGroup AS Utf8;
249269
DECLARE $MemberId AS Utf8;
250270
DECLARE $Database AS Utf8;
251-
DECLARE $LastHeartbeat AS Datetime;
271+
DECLARE $LastMasterHeartbeat AS Datetime;
252272
253273
UPDATE `%s`
254-
SET heartbeat_deadline = $LastHeartbeat
274+
SET heartbeat_deadline = $LastMasterHeartbeat,
275+
leaved = True
255276
WHERE consumer_group = $ConsumerGroup
256277
AND member_id = $MemberId
257278
AND database = $Database;

0 commit comments

Comments
 (0)