Skip to content

Commit 378da58

Browse files
authored
Merge eee8f73 into 38b28a9
2 parents 38b28a9 + eee8f73 commit 378da58

23 files changed

+4952
-2008
lines changed

ydb/core/kafka_proxy/actors/actors.h

+1
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui
176176
const TActorId& discoveryCacheActor);
177177
NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
178178
NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie);
179+
NActors::IActor* CreateKafkaBalancerActor(const TContext::TPtr context, ui64 cookie);
179180
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);
180181
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message);
181182
NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
#include "kafka_balancer_actor.h"
2+
3+
namespace NKafka {
4+
5+
const TString INSERT_NEW_GROUP = R"sql(
6+
--!syntax_v1
7+
DECLARE $ConsumerGroup AS Utf8;
8+
DECLARE $Generation AS Uint64;
9+
DECLARE $State AS Uint64;
10+
DECLARE $Database AS Utf8;
11+
DECLARE $Master AS Utf8;
12+
DECLARE $LastHeartbeat AS Datetime;
13+
DECLARE $ProtocolType AS Utf8;
14+
DECLARE $RebalanceTimeoutMs AS Uint32;
15+
16+
INSERT INTO `%s`
17+
(
18+
consumer_group,
19+
generation,
20+
state,
21+
database,
22+
last_heartbeat_time,
23+
master,
24+
protocol_type,
25+
rebalance_timeout_ms
26+
)
27+
VALUES
28+
(
29+
$ConsumerGroup,
30+
$Generation,
31+
$State,
32+
$Database,
33+
$LastHeartbeat,
34+
$Master,
35+
$ProtocolType,
36+
$RebalanceTimeoutMs
37+
);
38+
)sql";
39+
40+
const TString UPDATE_GROUP = R"sql(
41+
--!syntax_v1
42+
DECLARE $ConsumerGroup AS Utf8;
43+
DECLARE $State AS Uint64;
44+
DECLARE $Generation AS Uint64;
45+
DECLARE $Database AS Utf8;
46+
DECLARE $Master AS Utf8;
47+
DECLARE $LastHeartbeat AS Datetime;
48+
DECLARE $RebalanceTimeoutMs AS Uint32;
49+
50+
UPDATE `%s`
51+
SET
52+
state = $State,
53+
generation = $Generation,
54+
last_heartbeat_time = $LastHeartbeat,
55+
master = $Master,
56+
rebalance_timeout_ms = $RebalanceTimeoutMs
57+
WHERE database = $Database
58+
AND consumer_group = $ConsumerGroup;
59+
)sql";
60+
61+
const TString UPDATE_GROUP_STATE = R"sql(
62+
--!syntax_v1
63+
DECLARE $ConsumerGroup AS Utf8;
64+
DECLARE $Database AS Utf8;
65+
DECLARE $State AS Uint64;
66+
67+
UPDATE `%s`
68+
SET
69+
state = $State
70+
WHERE database = $Database
71+
AND consumer_group = $ConsumerGroup;
72+
)sql";
73+
74+
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
75+
--!syntax_v1
76+
DECLARE $ConsumerGroup AS Utf8;
77+
DECLARE $State AS Uint64;
78+
DECLARE $Database AS Utf8;
79+
DECLARE $Protocol AS Utf8;
80+
DECLARE $LastHeartbeat AS Datetime;
81+
82+
UPDATE `%s`
83+
SET
84+
state = $State,
85+
last_heartbeat_time = $LastHeartbeat,
86+
protocol = $Protocol
87+
WHERE database = $Database
88+
AND consumer_group = $ConsumerGroup;
89+
)sql";
90+
91+
const TString INSERT_MEMBER = R"sql(
92+
--!syntax_v1
93+
DECLARE $ConsumerGroup AS Utf8;
94+
DECLARE $Generation AS Uint64;
95+
DECLARE $MemberId AS Utf8;
96+
DECLARE $WorkerStateProto AS String;
97+
DECLARE $Database AS Utf8;
98+
DECLARE $HeartbeatDeadline AS Datetime;
99+
DECLARE $SessionTimeoutMs AS Uint32;
100+
101+
INSERT INTO `%s`
102+
(
103+
consumer_group,
104+
generation,
105+
member_id,
106+
heartbeat_deadline,
107+
worker_state_proto,
108+
database,
109+
session_timeout_ms
110+
)
111+
VALUES (
112+
$ConsumerGroup,
113+
$Generation,
114+
$MemberId,
115+
$HeartbeatDeadline,
116+
$WorkerStateProto,
117+
$Database,
118+
$SessionTimeoutMs
119+
);
120+
)sql";
121+
122+
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"sql(
123+
--!syntax_v1
124+
DECLARE $Assignments AS List<Struct<MemberId: Utf8, Assignment: Bytes>>;
125+
DECLARE $ConsumerGroup AS Utf8;
126+
DECLARE $Database AS Utf8;
127+
DECLARE $Generation AS Uint64;
128+
DECLARE $State AS Uint64;
129+
DECLARE $LastHeartbeat AS Datetime;
130+
131+
UPSERT INTO `%s`
132+
SELECT
133+
item.MemberId AS member_id,
134+
item.Assignment AS assignment,
135+
$ConsumerGroup AS consumer_group,
136+
$Database AS database,
137+
$Generation AS generation
138+
FROM AS_TABLE($Assignments) AS item;
139+
140+
UPDATE `%s`
141+
SET
142+
state = $State,
143+
last_heartbeat_time = $LastHeartbeat
144+
WHERE database = $Database
145+
AND consumer_group = $ConsumerGroup;
146+
)sql";
147+
148+
const TString SELECT_WORKER_STATES = R"sql(
149+
--!syntax_v1
150+
DECLARE $ConsumerGroup AS Utf8;
151+
DECLARE $Generation AS Uint64;
152+
DECLARE $Database AS Utf8;
153+
DECLARE $PaginationMemberId AS Utf8;
154+
DECLARE $Limit AS Uint64;
155+
156+
SELECT worker_state_proto, member_id
157+
FROM `%s`
158+
VIEW PRIMARY KEY
159+
WHERE database = $Database
160+
AND consumer_group = $ConsumerGroup
161+
AND generation = $Generation
162+
AND member_id > $PaginationMemberId
163+
ORDER BY member_id
164+
LIMIT $Limit;
165+
)sql";
166+
167+
const TString CHECK_GROUP_STATE = R"sql(
168+
--!syntax_v1
169+
DECLARE $ConsumerGroup AS Utf8;
170+
DECLARE $Database AS Utf8;
171+
172+
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type, rebalance_timeout_ms
173+
FROM `%s`
174+
VIEW PRIMARY KEY
175+
WHERE database = $Database
176+
AND consumer_group = $ConsumerGroup;
177+
)sql";
178+
179+
const TString FETCH_ASSIGNMENTS = R"sql(
180+
--!syntax_v1
181+
DECLARE $ConsumerGroup AS Utf8;
182+
DECLARE $Generation AS Uint64;
183+
DECLARE $MemberId AS Utf8;
184+
DECLARE $Database AS Utf8;
185+
186+
SELECT assignment
187+
FROM `%s`
188+
VIEW PRIMARY KEY
189+
WHERE database = $Database
190+
AND consumer_group = $ConsumerGroup
191+
AND generation = $Generation
192+
AND member_id = $MemberId;
193+
)sql";
194+
195+
const TString CHECK_DEAD_MEMBERS = R"sql(
196+
--!syntax_v1
197+
DECLARE $ConsumerGroup AS Utf8;
198+
DECLARE $Generation AS Uint64;
199+
DECLARE $Database AS Utf8;
200+
DECLARE $MemberId AS Utf8;
201+
202+
SELECT heartbeat_deadline
203+
FROM `%s`
204+
VIEW idx_group_generation_db_hb
205+
WHERE database = $Database
206+
AND consumer_group = $ConsumerGroup
207+
AND generation = $Generation
208+
ORDER BY heartbeat_deadline
209+
LIMIT 1;
210+
211+
SELECT session_timeout_ms
212+
FROM `%s`
213+
VIEW PRIMARY KEY
214+
WHERE database = $Database
215+
AND consumer_group = $ConsumerGroup
216+
AND generation = $Generation
217+
AND member_id = $MemberId;
218+
219+
)sql";
220+
221+
const TString UPDATE_LASTHEARTBEATS = R"sql(
222+
--!syntax_v1
223+
DECLARE $ConsumerGroup AS Utf8;
224+
DECLARE $Generation AS Uint64;
225+
DECLARE $MemberId AS Utf8;
226+
DECLARE $Database AS Utf8;
227+
DECLARE $LastHeartbeat AS Datetime;
228+
DECLARE $HeartbeatDeadline AS Datetime;
229+
DECLARE $UpdateGroupHeartbeat AS Bool;
230+
231+
UPDATE `%s`
232+
SET last_heartbeat_time = $LastHeartbeat
233+
WHERE consumer_group = $ConsumerGroup
234+
AND database = $Database
235+
AND $UpdateGroupHeartbeat = True;
236+
237+
UPDATE `%s`
238+
SET heartbeat_deadline = $HeartbeatDeadline
239+
WHERE consumer_group = $ConsumerGroup
240+
AND generation = $Generation
241+
AND member_id = $MemberId
242+
AND database = $Database;
243+
)sql";
244+
245+
246+
const TString UPDATE_LASTHEARTBEAT_TO_LEAVE_GROUP = R"sql(
247+
--!syntax_v1
248+
DECLARE $ConsumerGroup AS Utf8;
249+
DECLARE $MemberId AS Utf8;
250+
DECLARE $Database AS Utf8;
251+
DECLARE $LastHeartbeat AS Datetime;
252+
253+
UPDATE `%s`
254+
SET heartbeat_deadline = $LastHeartbeat
255+
WHERE consumer_group = $ConsumerGroup
256+
AND member_id = $MemberId
257+
AND database = $Database;
258+
)sql";
259+
260+
const TString CHECK_GROUPS_COUNT = R"sql(
261+
--!syntax_v1
262+
DECLARE $GroupsCountCheckDeadline AS Datetime;
263+
264+
SELECT COUNT(1) as groups_count
265+
FROM `%s`
266+
VIEW idx_last_hb
267+
WHERE last_heartbeat_time > $GroupsCountCheckDeadline;
268+
)sql";
269+
270+
} // namespace NKafka

0 commit comments

Comments
 (0)