Skip to content

Commit 980a59a

Browse files
authored
Merge 4e2abd8 into cb5d0e3
2 parents cb5d0e3 + 4e2abd8 commit 980a59a

23 files changed

+4869
-2008
lines changed

ydb/core/kafka_proxy/actors/actors.h

+1
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui
176176
const TActorId& discoveryCacheActor);
177177
NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
178178
NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie);
179+
NActors::IActor* CreateKafkaBalancerActor(const TContext::TPtr context, ui64 cookie);
179180
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);
180181
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message);
181182
NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
#include "kafka_balancer_actor.h"
2+
3+
namespace NKafka {
4+
5+
const TString INSERT_NEW_GROUP = R"sql(
6+
--!syntax_v1
7+
DECLARE $ConsumerGroup AS Utf8;
8+
DECLARE $Generation AS Uint64;
9+
DECLARE $State AS Uint64;
10+
DECLARE $Database AS Utf8;
11+
DECLARE $Master AS Utf8;
12+
DECLARE $LastHeartbeat AS Datetime;
13+
DECLARE $ProtocolType AS Utf8;
14+
DECLARE $RebalanceTimeoutMs AS Uint32;
15+
16+
INSERT INTO `%s`
17+
(
18+
consumer_group,
19+
generation,
20+
state,
21+
database,
22+
last_heartbeat_time,
23+
master,
24+
protocol_type,
25+
rebalance_timeout_ms
26+
)
27+
VALUES
28+
(
29+
$ConsumerGroup,
30+
$Generation,
31+
$State,
32+
$Database,
33+
$LastHeartbeat,
34+
$Master,
35+
$ProtocolType,
36+
$RebalanceTimeoutMs
37+
);
38+
)sql";
39+
40+
const TString UPDATE_GROUP = R"sql(
41+
--!syntax_v1
42+
DECLARE $ConsumerGroup AS Utf8;
43+
DECLARE $State AS Uint64;
44+
DECLARE $Generation AS Uint64;
45+
DECLARE $Database AS Utf8;
46+
DECLARE $Master AS Utf8;
47+
DECLARE $LastHeartbeat AS Datetime;
48+
DECLARE $RebalanceTimeoutMs AS Uint32;
49+
50+
UPDATE `%s`
51+
SET
52+
state = $State,
53+
generation = $Generation,
54+
last_heartbeat_time = $LastHeartbeat,
55+
master = $Master,
56+
rebalance_timeout_ms = $RebalanceTimeoutMs
57+
WHERE database = $Database
58+
AND consumer_group = $ConsumerGroup;
59+
)sql";
60+
61+
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
62+
--!syntax_v1
63+
DECLARE $ConsumerGroup AS Utf8;
64+
DECLARE $State AS Uint64;
65+
DECLARE $Database AS Utf8;
66+
DECLARE $Protocol AS Utf8;
67+
DECLARE $LastHeartbeat AS Datetime;
68+
69+
UPDATE `%s`
70+
SET
71+
state = $State,
72+
last_heartbeat_time = $LastHeartbeat,
73+
protocol = $Protocol
74+
WHERE database = $Database
75+
AND consumer_group = $ConsumerGroup;
76+
)sql";
77+
78+
const TString INSERT_MEMBER = R"sql(
79+
--!syntax_v1
80+
DECLARE $ConsumerGroup AS Utf8;
81+
DECLARE $Generation AS Uint64;
82+
DECLARE $MemberId AS Utf8;
83+
DECLARE $WorkerStateProto AS String;
84+
DECLARE $Database AS Utf8;
85+
DECLARE $HeartbeatDeadline AS Datetime;
86+
DECLARE $SessionTimeoutMs AS Uint32;
87+
88+
INSERT INTO `%s`
89+
(
90+
consumer_group,
91+
generation,
92+
member_id,
93+
heartbeat_deadline,
94+
worker_state_proto,
95+
database,
96+
session_timeout_ms
97+
)
98+
VALUES (
99+
$ConsumerGroup,
100+
$Generation,
101+
$MemberId,
102+
$HeartbeatDeadline,
103+
$WorkerStateProto,
104+
$Database,
105+
$SessionTimeoutMs
106+
);
107+
)sql";
108+
109+
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"sql(
110+
--!syntax_v1
111+
DECLARE $Assignments AS List<Struct<MemberId: Utf8, Assignment: Bytes>>;
112+
DECLARE $ConsumerGroup AS Utf8;
113+
DECLARE $Database AS Utf8;
114+
DECLARE $Generation AS Uint64;
115+
DECLARE $State AS Uint64;
116+
DECLARE $LastHeartbeat AS Datetime;
117+
118+
UPSERT INTO `%s`
119+
SELECT
120+
item.MemberId AS member_id,
121+
item.Assignment AS assignment,
122+
$ConsumerGroup AS consumer_group,
123+
$Database AS database,
124+
$Generation AS generation
125+
FROM AS_TABLE($Assignments) AS item;
126+
127+
UPDATE `%s`
128+
SET
129+
state = $State,
130+
last_heartbeat_time = $LastHeartbeat
131+
WHERE database = $Database
132+
AND consumer_group = $ConsumerGroup;
133+
)sql";
134+
135+
const TString SELECT_WORKER_STATES = R"sql(
136+
--!syntax_v1
137+
DECLARE $ConsumerGroup AS Utf8;
138+
DECLARE $Generation AS Uint64;
139+
DECLARE $Database AS Utf8;
140+
DECLARE $PaginationMemberId AS Utf8;
141+
DECLARE $Limit AS Uint64;
142+
143+
SELECT worker_state_proto, member_id
144+
FROM `%s`
145+
VIEW PRIMARY KEY
146+
WHERE database = $Database
147+
AND consumer_group = $ConsumerGroup
148+
AND generation = $Generation
149+
AND member_id > $PaginationMemberId
150+
ORDER BY member_id
151+
LIMIT $Limit;
152+
)sql";
153+
154+
const TString CHECK_GROUP_STATE = R"sql(
155+
--!syntax_v1
156+
DECLARE $ConsumerGroup AS Utf8;
157+
DECLARE $Database AS Utf8;
158+
159+
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type, rebalance_timeout_ms
160+
FROM `%s`
161+
VIEW PRIMARY KEY
162+
WHERE database = $Database
163+
AND consumer_group = $ConsumerGroup;
164+
)sql";
165+
166+
const TString FETCH_ASSIGNMENTS = R"sql(
167+
--!syntax_v1
168+
DECLARE $ConsumerGroup AS Utf8;
169+
DECLARE $Generation AS Uint64;
170+
DECLARE $MemberId AS Utf8;
171+
DECLARE $Database AS Utf8;
172+
173+
SELECT assignment
174+
FROM `%s`
175+
VIEW PRIMARY KEY
176+
WHERE database = $Database
177+
AND consumer_group = $ConsumerGroup
178+
AND generation = $Generation
179+
AND member_id = $MemberId;
180+
)sql";
181+
182+
const TString CHECK_DEAD_MEMBERS = R"sql(
183+
--!syntax_v1
184+
DECLARE $ConsumerGroup AS Utf8;
185+
DECLARE $Generation AS Uint64;
186+
DECLARE $Database AS Utf8;
187+
DECLARE $Deadline AS Datetime;
188+
DECLARE $MemberId AS Utf8;
189+
190+
SELECT COUNT(1) as cnt
191+
FROM `%s`
192+
VIEW idx_group_generation_db_hb
193+
WHERE database = $Database
194+
AND consumer_group = $ConsumerGroup
195+
AND generation = $Generation
196+
AND heartbeat_deadline < $Deadline;
197+
198+
SELECT session_timeout_ms
199+
FROM `%s`
200+
VIEW PRIMARY KEY
201+
WHERE database = $Database
202+
AND consumer_group = $ConsumerGroup
203+
AND generation = $Generation
204+
AND member_id = $MemberId;
205+
206+
)sql";
207+
208+
const TString UPDATE_LASTHEARTBEATS = R"sql(
209+
--!syntax_v1
210+
DECLARE $ConsumerGroup AS Utf8;
211+
DECLARE $Generation AS Uint64;
212+
DECLARE $MemberId AS Utf8;
213+
DECLARE $Database AS Utf8;
214+
DECLARE $LastHeartbeat AS Datetime;
215+
DECLARE $HeartbeatDeadline AS Datetime;
216+
DECLARE $UpdateGroupHeartbeat AS Bool;
217+
218+
UPDATE `%s`
219+
SET last_heartbeat_time = $LastHeartbeat
220+
WHERE consumer_group = $ConsumerGroup
221+
AND database = $Database
222+
AND $UpdateGroupHeartbeat = True;
223+
224+
UPDATE `%s`
225+
SET heartbeat_deadline = $HeartbeatDeadline
226+
WHERE consumer_group = $ConsumerGroup
227+
AND generation = $Generation
228+
AND member_id = $MemberId
229+
AND database = $Database;
230+
)sql";
231+
232+
233+
const TString UPDATE_LASTHEARTBEAT_TO_LEAVE_GROUP = R"sql(
234+
--!syntax_v1
235+
DECLARE $ConsumerGroup AS Utf8;
236+
DECLARE $MemberId AS Utf8;
237+
DECLARE $Database AS Utf8;
238+
DECLARE $LastHeartbeat AS Datetime;
239+
240+
UPDATE `%s`
241+
SET heartbeat_deadline = $LastHeartbeat
242+
WHERE consumer_group = $ConsumerGroup
243+
AND member_id = $MemberId
244+
AND database = $Database;
245+
)sql";
246+
247+
const TString CHECK_GROUPS_COUNT = R"sql(
248+
--!syntax_v1
249+
SELECT COUNT(1) as groups_count
250+
FROM `%s`
251+
)sql";
252+
253+
} // namespace NKafka

0 commit comments

Comments
 (0)