Skip to content

Commit f80ee10

Browse files
authored
Merge b306d7a into 347babc
2 parents 347babc + b306d7a commit f80ee10

23 files changed

+5131
-2008
lines changed

ydb/core/kafka_proxy/actors/actors.h

+1
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui
176176
const TActorId& discoveryCacheActor);
177177
NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
178178
NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie);
179+
NActors::IActor* CreateKafkaBalancerActor(const TContext::TPtr context, ui64 cookie);
179180
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);
180181
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message);
181182
NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
#include "kafka_balancer_actor.h"
2+
3+
namespace NKafka {
4+
5+
const TString INSERT_NEW_GROUP = R"sql(
6+
--!syntax_v1
7+
DECLARE $ConsumerGroup AS Utf8;
8+
DECLARE $Generation AS Uint64;
9+
DECLARE $State AS Uint64;
10+
DECLARE $Database AS Utf8;
11+
DECLARE $Master AS Utf8;
12+
DECLARE $LastMasterHeartbeat AS Datetime;
13+
DECLARE $ProtocolType AS Utf8;
14+
15+
INSERT INTO `%s`
16+
(
17+
consumer_group,
18+
generation,
19+
state,
20+
database,
21+
last_heartbeat_time,
22+
master,
23+
protocol_type
24+
)
25+
VALUES
26+
(
27+
$ConsumerGroup,
28+
$Generation,
29+
$State,
30+
$Database,
31+
$LastMasterHeartbeat,
32+
$Master,
33+
$ProtocolType
34+
);
35+
)sql";
36+
37+
const TString UPDATE_GROUP = R"sql(
38+
--!syntax_v1
39+
DECLARE $ConsumerGroup AS Utf8;
40+
DECLARE $State AS Uint64;
41+
DECLARE $Generation AS Uint64;
42+
DECLARE $Database AS Utf8;
43+
DECLARE $Master AS Utf8;
44+
DECLARE $LastMasterHeartbeat AS Datetime;
45+
46+
UPDATE `%s`
47+
SET
48+
state = $State,
49+
generation = $Generation,
50+
last_heartbeat_time = $LastMasterHeartbeat,
51+
master = $Master
52+
WHERE database = $Database
53+
AND consumer_group = $ConsumerGroup;
54+
)sql";
55+
56+
const TString UPDATE_GROUP_STATE = R"sql(
57+
--!syntax_v1
58+
DECLARE $ConsumerGroup AS Utf8;
59+
DECLARE $Database AS Utf8;
60+
DECLARE $State AS Uint64;
61+
62+
UPDATE `%s`
63+
SET
64+
state = $State
65+
WHERE database = $Database
66+
AND consumer_group = $ConsumerGroup;
67+
)sql";
68+
69+
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
70+
--!syntax_v1
71+
DECLARE $ConsumerGroup AS Utf8;
72+
DECLARE $State AS Uint64;
73+
DECLARE $Database AS Utf8;
74+
DECLARE $Protocol AS Utf8;
75+
DECLARE $LastMasterHeartbeat AS Datetime;
76+
77+
UPDATE `%s`
78+
SET
79+
state = $State,
80+
last_heartbeat_time = $LastMasterHeartbeat,
81+
protocol = $Protocol
82+
WHERE database = $Database
83+
AND consumer_group = $ConsumerGroup;
84+
)sql";
85+
86+
const TString INSERT_MEMBER = R"sql(
87+
--!syntax_v1
88+
DECLARE $ConsumerGroup AS Utf8;
89+
DECLARE $Generation AS Uint64;
90+
DECLARE $MemberId AS Utf8;
91+
DECLARE $InstanceId AS Utf8;
92+
DECLARE $WorkerStateProto AS String;
93+
DECLARE $Database AS Utf8;
94+
DECLARE $HeartbeatDeadline AS Datetime;
95+
DECLARE $SessionTimeoutMs AS Uint32;
96+
97+
INSERT INTO `%s`
98+
(
99+
consumer_group,
100+
generation,
101+
member_id,
102+
instance_id,
103+
heartbeat_deadline,
104+
worker_state_proto,
105+
database,
106+
session_timeout_ms
107+
)
108+
VALUES (
109+
$ConsumerGroup,
110+
$Generation,
111+
$MemberId,
112+
$InstanceId,
113+
$HeartbeatDeadline,
114+
$WorkerStateProto,
115+
$Database,
116+
$SessionTimeoutMs
117+
);
118+
)sql";
119+
120+
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"sql(
121+
--!syntax_v1
122+
DECLARE $Assignments AS List<Struct<MemberId: Utf8, Assignment: Bytes>>;
123+
DECLARE $ConsumerGroup AS Utf8;
124+
DECLARE $Database AS Utf8;
125+
DECLARE $Generation AS Uint64;
126+
DECLARE $State AS Uint64;
127+
DECLARE $LastMasterHeartbeat AS Datetime;
128+
129+
UPSERT INTO `%s`
130+
SELECT
131+
item.MemberId AS member_id,
132+
item.Assignment AS assignment,
133+
$ConsumerGroup AS consumer_group,
134+
$Database AS database,
135+
$Generation AS generation
136+
FROM AS_TABLE($Assignments) AS item;
137+
138+
UPDATE `%s`
139+
SET
140+
state = $State,
141+
last_heartbeat_time = $LastMasterHeartbeat,
142+
last_success_generation = $Generation
143+
WHERE database = $Database
144+
AND consumer_group = $ConsumerGroup;
145+
)sql";
146+
147+
const TString SELECT_ALIVE_MEMBERS = R"sql(
148+
--!syntax_v1
149+
DECLARE $ConsumerGroup AS Utf8;
150+
DECLARE $Generation AS Uint64;
151+
DECLARE $Database AS Utf8;
152+
DECLARE $PaginationMemberId AS Utf8;
153+
DECLARE $Limit AS Uint64;
154+
155+
SELECT member_id, instance_id, rebalance_timeout_ms
156+
FROM `%s`
157+
VIEW PRIMARY KEY
158+
WHERE database = $Database
159+
AND consumer_group = $ConsumerGroup
160+
AND generation = $Generation
161+
AND member_id > $PaginationMemberId
162+
AND (leaved IS NULL OR leaved = False)
163+
ORDER BY member_id
164+
LIMIT $Limit;
165+
)sql";
166+
167+
const TString SELECT_WORKER_STATES = R"sql(
168+
--!syntax_v1
169+
DECLARE $ConsumerGroup AS Utf8;
170+
DECLARE $Generation AS Uint64;
171+
DECLARE $Database AS Utf8;
172+
DECLARE $PaginationMemberId AS Utf8;
173+
DECLARE $Limit AS Uint64;
174+
175+
SELECT worker_state_proto, member_id, instance_id
176+
FROM `%s`
177+
VIEW PRIMARY KEY
178+
WHERE database = $Database
179+
AND consumer_group = $ConsumerGroup
180+
AND generation = $Generation
181+
AND member_id > $PaginationMemberId
182+
ORDER BY member_id
183+
LIMIT $Limit;
184+
)sql";
185+
186+
const TString CHECK_GROUP_STATE = R"sql(
187+
--!syntax_v1
188+
DECLARE $ConsumerGroup AS Utf8;
189+
DECLARE $Database AS Utf8;
190+
191+
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type, last_success_generation
192+
FROM `%s`
193+
VIEW PRIMARY KEY
194+
WHERE database = $Database
195+
AND consumer_group = $ConsumerGroup;
196+
)sql";
197+
198+
const TString FETCH_ASSIGNMENTS = R"sql(
199+
--!syntax_v1
200+
DECLARE $ConsumerGroup AS Utf8;
201+
DECLARE $Generation AS Uint64;
202+
DECLARE $MemberId AS Utf8;
203+
DECLARE $Database AS Utf8;
204+
205+
SELECT assignment
206+
FROM `%s`
207+
VIEW PRIMARY KEY
208+
WHERE database = $Database
209+
AND consumer_group = $ConsumerGroup
210+
AND generation = $Generation
211+
AND member_id = $MemberId;
212+
)sql";
213+
214+
const TString CHECK_DEAD_MEMBERS = R"sql(
215+
--!syntax_v1
216+
DECLARE $ConsumerGroup AS Utf8;
217+
DECLARE $Generation AS Uint64;
218+
DECLARE $Database AS Utf8;
219+
DECLARE $MemberId AS Utf8;
220+
221+
SELECT heartbeat_deadline
222+
FROM `%s`
223+
VIEW idx_group_generation_db_hb
224+
WHERE database = $Database
225+
AND consumer_group = $ConsumerGroup
226+
AND generation = $Generation
227+
ORDER BY heartbeat_deadline
228+
LIMIT 1;
229+
230+
SELECT session_timeout_ms
231+
FROM `%s`
232+
VIEW PRIMARY KEY
233+
WHERE database = $Database
234+
AND consumer_group = $ConsumerGroup
235+
AND generation = $Generation
236+
AND member_id = $MemberId;
237+
238+
)sql";
239+
240+
const TString UPDATE_LAST_HEARTBEATS = R"sql(
241+
--!syntax_v1
242+
DECLARE $ConsumerGroup AS Utf8;
243+
DECLARE $Generation AS Uint64;
244+
DECLARE $MemberId AS Utf8;
245+
DECLARE $Database AS Utf8;
246+
DECLARE $LastMasterHeartbeat AS Datetime;
247+
DECLARE $HeartbeatDeadline AS Datetime;
248+
DECLARE $UpdateGroupHeartbeat AS Bool;
249+
250+
UPDATE `%s`
251+
SET last_heartbeat_time = $LastMasterHeartbeat
252+
WHERE consumer_group = $ConsumerGroup
253+
AND database = $Database
254+
AND generation = $Generation
255+
AND $UpdateGroupHeartbeat = True;
256+
257+
UPDATE `%s`
258+
SET heartbeat_deadline = $HeartbeatDeadline
259+
WHERE consumer_group = $ConsumerGroup
260+
AND generation = $Generation
261+
AND member_id = $MemberId
262+
AND database = $Database;
263+
)sql";
264+
265+
266+
const TString UPDATE_LASTHEARTBEAT_TO_LEAVE_GROUP = R"sql(
267+
--!syntax_v1
268+
DECLARE $ConsumerGroup AS Utf8;
269+
DECLARE $MemberId AS Utf8;
270+
DECLARE $Database AS Utf8;
271+
DECLARE $LastMasterHeartbeat AS Datetime;
272+
273+
UPDATE `%s`
274+
SET heartbeat_deadline = $LastMasterHeartbeat,
275+
leaved = True
276+
WHERE consumer_group = $ConsumerGroup
277+
AND member_id = $MemberId
278+
AND database = $Database;
279+
)sql";
280+
281+
const TString CHECK_GROUPS_COUNT = R"sql(
282+
--!syntax_v1
283+
DECLARE $GroupsCountCheckDeadline AS Datetime;
284+
285+
SELECT COUNT(1) as groups_count
286+
FROM `%s`
287+
VIEW idx_last_hb
288+
WHERE last_heartbeat_time > $GroupsCountCheckDeadline;
289+
)sql";
290+
291+
} // namespace NKafka

0 commit comments

Comments
 (0)