Skip to content

Commit a59cadd

Browse files
authored
Merge 48ec906 into e03ee59
2 parents e03ee59 + 48ec906 commit a59cadd

23 files changed

+4567
-2008
lines changed

ydb/core/kafka_proxy/actors/actors.h

+1
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui
176176
const TActorId& discoveryCacheActor);
177177
NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
178178
NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie);
179+
NActors::IActor* CreateKafkaBalancerActor(const TContext::TPtr context, ui64 cookie);
179180
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);
180181
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message);
181182
NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
#include "kafka_balancer_actor.h"
2+
3+
namespace NKafka {
4+
5+
const TString INSERT_NEW_GROUP = R"sql(
6+
--!syntax_v1
7+
DECLARE $ConsumerGroup AS Utf8;
8+
DECLARE $Generation AS Uint64;
9+
DECLARE $State AS Uint64;
10+
DECLARE $Database AS Utf8;
11+
DECLARE $Master AS Utf8;
12+
DECLARE $LastHeartbeat AS Datetime;
13+
DECLARE $ProtocolType AS Utf8;
14+
15+
INSERT INTO `%s`
16+
(
17+
consumer_group,
18+
generation,
19+
state,
20+
database,
21+
last_heartbeat_time,
22+
master,
23+
protocol_type
24+
)
25+
VALUES
26+
(
27+
$ConsumerGroup,
28+
$Generation,
29+
$State,
30+
$Database,
31+
$LastHeartbeat,
32+
$Master,
33+
$ProtocolType
34+
);
35+
)sql";
36+
37+
const TString UPDATE_GROUP = R"sql(
38+
--!syntax_v1
39+
DECLARE $ConsumerGroup AS Utf8;
40+
DECLARE $State AS Uint64;
41+
DECLARE $Generation AS Uint64;
42+
DECLARE $Database AS Utf8;
43+
DECLARE $Master AS Utf8;
44+
DECLARE $LastHeartbeat AS Datetime;
45+
46+
UPDATE `%s`
47+
SET
48+
state = $State,
49+
generation = $Generation,
50+
last_heartbeat_time = $LastHeartbeat,
51+
master = $Master
52+
WHERE consumer_group = $ConsumerGroup
53+
AND database = $Database;
54+
)sql";
55+
56+
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
57+
--!syntax_v1
58+
DECLARE $ConsumerGroup AS Utf8;
59+
DECLARE $State AS Uint64;
60+
DECLARE $Database AS Utf8;
61+
DECLARE $Protocol AS Utf8;
62+
DECLARE $LastHeartbeat AS Datetime;
63+
64+
UPDATE `%s`
65+
SET
66+
state = $State,
67+
last_heartbeat_time = $LastHeartbeat,
68+
protocol = $Protocol
69+
WHERE consumer_group = $ConsumerGroup
70+
AND database = $Database;
71+
)sql";
72+
73+
const TString INSERT_MEMBER = R"sql(
74+
--!syntax_v1
75+
DECLARE $ConsumerGroup AS Utf8;
76+
DECLARE $Generation AS Uint64;
77+
DECLARE $MemberId AS Utf8;
78+
DECLARE $WorkerStateProto AS String;
79+
DECLARE $Database AS Utf8;
80+
DECLARE $LastHeartbeat AS Datetime;
81+
82+
INSERT INTO `%s`
83+
(
84+
consumer_group,
85+
generation,
86+
member_id,
87+
last_heartbeat_time,
88+
worker_state_proto,
89+
database
90+
)
91+
VALUES (
92+
$ConsumerGroup,
93+
$Generation,
94+
$MemberId,
95+
$LastHeartbeat,
96+
$WorkerStateProto,
97+
$Database
98+
);
99+
)sql";
100+
101+
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"sql(
102+
--!syntax_v1
103+
DECLARE $Assignments AS List<Struct<MemberId: Utf8, Assignment: Bytes>>;
104+
DECLARE $ConsumerGroup AS Utf8;
105+
DECLARE $Database AS Utf8;
106+
DECLARE $Generation AS Uint64;
107+
DECLARE $State AS Uint64;
108+
DECLARE $LastHeartbeat AS Datetime;
109+
110+
UPSERT INTO `%s`
111+
SELECT
112+
item.MemberId AS member_id,
113+
item.Assignment AS assignment,
114+
$ConsumerGroup AS consumer_group,
115+
$Database AS database,
116+
$Generation AS generation
117+
FROM AS_TABLE($Assignments) AS item;
118+
119+
UPDATE `%s`
120+
SET
121+
state = $State,
122+
last_heartbeat_time = $LastHeartbeat
123+
WHERE consumer_group = $ConsumerGroup
124+
AND database = $Database;
125+
)sql";
126+
127+
const TString UPDATE_GROUPS_AND_SELECT_WORKER_STATES = R"sql(
128+
--!syntax_v1
129+
DECLARE $ConsumerGroup AS Utf8;
130+
DECLARE $State AS Uint64;
131+
DECLARE $Generation AS Uint64;
132+
DECLARE $Database AS Utf8;
133+
DECLARE $LastHeartbeat AS Datetime;
134+
135+
UPDATE `%s`
136+
SET
137+
state = $State,
138+
last_heartbeat_time = $LastHeartbeat
139+
WHERE consumer_group = $ConsumerGroup
140+
AND database = $Database;
141+
142+
SELECT worker_state_proto, member_id
143+
FROM `%s`
144+
WHERE consumer_group = $ConsumerGroup
145+
AND generation = $Generation
146+
AND database = $Database;
147+
)sql";
148+
149+
const TString CHECK_GROUP_STATE = R"sql(
150+
--!syntax_v1
151+
DECLARE $ConsumerGroup AS Utf8;
152+
DECLARE $Database AS Utf8;
153+
154+
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type
155+
FROM `%s`
156+
WHERE consumer_group = $ConsumerGroup
157+
AND database = $Database;
158+
)sql";
159+
160+
const TString FETCH_ASSIGNMENTS = R"sql(
161+
--!syntax_v1
162+
DECLARE $ConsumerGroup AS Utf8;
163+
DECLARE $Generation AS Uint64;
164+
DECLARE $MemberId AS Utf8;
165+
DECLARE $Database AS Utf8;
166+
167+
SELECT assignment
168+
FROM `%s`
169+
WHERE consumer_group = $ConsumerGroup
170+
AND generation = $Generation
171+
AND member_id = $MemberId
172+
AND database = $Database;
173+
)sql";
174+
175+
const TString CHECK_DEAD_MEMBERS = R"sql(
176+
--!syntax_v1
177+
DECLARE $ConsumerGroup AS Utf8;
178+
DECLARE $Generation AS Uint64;
179+
DECLARE $Database AS Utf8;
180+
DECLARE $Deadline AS Datetime;
181+
182+
SELECT COUNT(1) as cnt
183+
FROM `%s`
184+
WHERE consumer_group = $ConsumerGroup
185+
AND generation = $Generation
186+
AND database = $Database
187+
AND last_heartbeat_time < $Deadline;
188+
)sql";
189+
190+
const TString UPDATE_LASTHEARTBEATS = R"sql(
191+
--!syntax_v1
192+
DECLARE $ConsumerGroup AS Utf8;
193+
DECLARE $Generation AS Uint64;
194+
DECLARE $MemberId AS Utf8;
195+
DECLARE $Database AS Utf8;
196+
DECLARE $LastHeartbeat AS Datetime;
197+
DECLARE $UpdateGroupHeartbeat AS Bool;
198+
199+
UPDATE `%s`
200+
SET last_heartbeat_time = $LastHeartbeat
201+
WHERE consumer_group = $ConsumerGroup
202+
AND database = $Database
203+
AND $UpdateGroupHeartbeat = True;
204+
205+
UPDATE `%s`
206+
SET last_heartbeat_time = $LastHeartbeat
207+
WHERE consumer_group = $ConsumerGroup
208+
AND generation = $Generation
209+
AND member_id = $MemberId
210+
AND database = $Database;
211+
)sql";
212+
213+
214+
const TString UPDATE_LASTHEARTBEAT_TO_LEAVE_GROUP = R"sql(
215+
--!syntax_v1
216+
DECLARE $ConsumerGroup AS Utf8;
217+
DECLARE $MemberId AS Utf8;
218+
DECLARE $Database AS Utf8;
219+
DECLARE $LastHeartbeat AS Datetime;
220+
221+
UPDATE `%s`
222+
SET last_heartbeat_time = $LastHeartbeat
223+
WHERE consumer_group = $ConsumerGroup
224+
AND member_id = $MemberId
225+
AND database = $Database;
226+
)sql";
227+
228+
} // namespace NKafka
229+
230+
// savnik check max members count

0 commit comments

Comments
 (0)