Skip to content

Commit 2cf679e

Browse files
committed
fixes after review
1 parent c73d194 commit 2cf679e

6 files changed

+324
-161
lines changed

ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp

+62-39
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const TString INSERT_NEW_GROUP = R"sql(
1111
DECLARE $Master AS Utf8;
1212
DECLARE $LastHeartbeat AS Datetime;
1313
DECLARE $ProtocolType AS Utf8;
14+
DECLARE $RebalanceTimeoutMs AS Uint32;
1415
1516
INSERT INTO `%s`
1617
(
@@ -20,7 +21,8 @@ const TString INSERT_NEW_GROUP = R"sql(
2021
database,
2122
last_heartbeat_time,
2223
master,
23-
protocol_type
24+
protocol_type,
25+
rebalance_timeout_ms
2426
)
2527
VALUES
2628
(
@@ -30,7 +32,8 @@ const TString INSERT_NEW_GROUP = R"sql(
3032
$Database,
3133
$LastHeartbeat,
3234
$Master,
33-
$ProtocolType
35+
$ProtocolType,
36+
$RebalanceTimeoutMs
3437
);
3538
)sql";
3639

@@ -42,15 +45,17 @@ const TString UPDATE_GROUP = R"sql(
4245
DECLARE $Database AS Utf8;
4346
DECLARE $Master AS Utf8;
4447
DECLARE $LastHeartbeat AS Datetime;
48+
DECLARE $RebalanceTimeoutMs AS Uint32;
4549
4650
UPDATE `%s`
4751
SET
4852
state = $State,
4953
generation = $Generation,
5054
last_heartbeat_time = $LastHeartbeat,
51-
master = $Master
52-
WHERE consumer_group = $ConsumerGroup
53-
AND database = $Database;
55+
master = $Master,
56+
rebalance_timeout_ms = $RebalanceTimeoutMs
57+
WHERE database = $Database
58+
AND consumer_group = $ConsumerGroup;
5459
)sql";
5560

5661
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
@@ -66,8 +71,8 @@ const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
6671
state = $State,
6772
last_heartbeat_time = $LastHeartbeat,
6873
protocol = $Protocol
69-
WHERE consumer_group = $ConsumerGroup
70-
AND database = $Database;
74+
WHERE database = $Database
75+
AND consumer_group = $ConsumerGroup;
7176
)sql";
7277

7378
const TString INSERT_MEMBER = R"sql(
@@ -77,24 +82,27 @@ const TString INSERT_MEMBER = R"sql(
7782
DECLARE $MemberId AS Utf8;
7883
DECLARE $WorkerStateProto AS String;
7984
DECLARE $Database AS Utf8;
80-
DECLARE $LastHeartbeat AS Datetime;
85+
DECLARE $HeartbeatDeadline AS Datetime;
86+
DECLARE $SessionTimeoutMs AS Uint32;
8187
8288
INSERT INTO `%s`
8389
(
8490
consumer_group,
8591
generation,
8692
member_id,
87-
last_heartbeat_time,
93+
heartbeat_deadline,
8894
worker_state_proto,
89-
database
95+
database,
96+
session_timeout_ms
9097
)
9198
VALUES (
9299
$ConsumerGroup,
93100
$Generation,
94101
$MemberId,
95-
$LastHeartbeat,
102+
$HeartbeatDeadline,
96103
$WorkerStateProto,
97-
$Database
104+
$Database,
105+
$SessionTimeoutMs
98106
);
99107
)sql";
100108

@@ -120,41 +128,39 @@ const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"sql(
120128
SET
121129
state = $State,
122130
last_heartbeat_time = $LastHeartbeat
123-
WHERE consumer_group = $ConsumerGroup
124-
AND database = $Database;
131+
WHERE database = $Database
132+
AND consumer_group = $ConsumerGroup;
125133
)sql";
126134

127-
const TString UPDATE_GROUPS_AND_SELECT_WORKER_STATES = R"sql(
135+
const TString SELECT_WORKER_STATES = R"sql(
128136
--!syntax_v1
129137
DECLARE $ConsumerGroup AS Utf8;
130-
DECLARE $State AS Uint64;
131138
DECLARE $Generation AS Uint64;
132139
DECLARE $Database AS Utf8;
133-
DECLARE $LastHeartbeat AS Datetime;
134-
135-
UPDATE `%s`
136-
SET
137-
state = $State,
138-
last_heartbeat_time = $LastHeartbeat
139-
WHERE consumer_group = $ConsumerGroup
140-
AND database = $Database;
140+
DECLARE $PaginationMemberId AS Utf8;
141+
DECLARE $Limit AS Uint64;
141142
142143
SELECT worker_state_proto, member_id
143144
FROM `%s`
144-
WHERE consumer_group = $ConsumerGroup
145+
VIEW PRIMARY KEY
146+
WHERE database = $Database
147+
AND consumer_group = $ConsumerGroup
145148
AND generation = $Generation
146-
AND database = $Database;
149+
AND member_id > $PaginationMemberId
150+
ORDER BY member_id
151+
LIMIT $Limit;
147152
)sql";
148153

149154
const TString CHECK_GROUP_STATE = R"sql(
150155
--!syntax_v1
151156
DECLARE $ConsumerGroup AS Utf8;
152157
DECLARE $Database AS Utf8;
153158
154-
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type
159+
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type, rebalance_timeout_ms
155160
FROM `%s`
156-
WHERE consumer_group = $ConsumerGroup
157-
AND database = $Database;
161+
VIEW PRIMARY KEY
162+
WHERE database = $Database
163+
AND consumer_group = $ConsumerGroup;
158164
)sql";
159165

160166
const TString FETCH_ASSIGNMENTS = R"sql(
@@ -166,10 +172,11 @@ const TString FETCH_ASSIGNMENTS = R"sql(
166172
167173
SELECT assignment
168174
FROM `%s`
169-
WHERE consumer_group = $ConsumerGroup
175+
VIEW PRIMARY KEY
176+
WHERE database = $Database
177+
AND consumer_group = $ConsumerGroup
170178
AND generation = $Generation
171-
AND member_id = $MemberId
172-
AND database = $Database;
179+
AND member_id = $MemberId;
173180
)sql";
174181

175182
const TString CHECK_DEAD_MEMBERS = R"sql(
@@ -178,13 +185,24 @@ const TString CHECK_DEAD_MEMBERS = R"sql(
178185
DECLARE $Generation AS Uint64;
179186
DECLARE $Database AS Utf8;
180187
DECLARE $Deadline AS Datetime;
188+
DECLARE $MemberId AS Utf8;
181189
182190
SELECT COUNT(1) as cnt
183191
FROM `%s`
184-
WHERE consumer_group = $ConsumerGroup
192+
VIEW idx_group_generation_db_hb
193+
WHERE database = $Database
194+
AND consumer_group = $ConsumerGroup
195+
AND generation = $Generation
196+
AND heartbeat_deadline < $Deadline;
197+
198+
SELECT session_timeout_ms
199+
FROM `%s`
200+
VIEW PRIMARY KEY
201+
WHERE database = $Database
202+
AND consumer_group = $ConsumerGroup
185203
AND generation = $Generation
186-
AND database = $Database
187-
AND last_heartbeat_time < $Deadline;
204+
AND member_id = $MemberId;
205+
188206
)sql";
189207

190208
const TString UPDATE_LASTHEARTBEATS = R"sql(
@@ -194,6 +212,7 @@ const TString UPDATE_LASTHEARTBEATS = R"sql(
194212
DECLARE $MemberId AS Utf8;
195213
DECLARE $Database AS Utf8;
196214
DECLARE $LastHeartbeat AS Datetime;
215+
DECLARE $HeartbeatDeadline AS Datetime;
197216
DECLARE $UpdateGroupHeartbeat AS Bool;
198217
199218
UPDATE `%s`
@@ -203,7 +222,7 @@ const TString UPDATE_LASTHEARTBEATS = R"sql(
203222
AND $UpdateGroupHeartbeat = True;
204223
205224
UPDATE `%s`
206-
SET last_heartbeat_time = $LastHeartbeat
225+
SET heartbeat_deadline = $HeartbeatDeadline
207226
WHERE consumer_group = $ConsumerGroup
208227
AND generation = $Generation
209228
AND member_id = $MemberId
@@ -219,12 +238,16 @@ const TString UPDATE_LASTHEARTBEAT_TO_LEAVE_GROUP = R"sql(
219238
DECLARE $LastHeartbeat AS Datetime;
220239
221240
UPDATE `%s`
222-
SET last_heartbeat_time = $LastHeartbeat
241+
SET heartbeat_deadline = $LastHeartbeat
223242
WHERE consumer_group = $ConsumerGroup
224243
AND member_id = $MemberId
225244
AND database = $Database;
226245
)sql";
227246

228-
} // namespace NKafka
247+
const TString CHECK_GROUPS_COUNT = R"sql(
248+
--!syntax_v1
249+
SELECT COUNT(1) as groups_count
250+
FROM `%s`
251+
)sql";
229252

230-
// savnik check max members count
253+
} // namespace NKafka

0 commit comments

Comments
 (0)