Skip to content

Commit 4a0d6ae

Browse files
authored
Merge fd35bc3 into b5b10f6
2 parents b5b10f6 + fd35bc3 commit 4a0d6ae

24 files changed

+5219
-2009
lines changed

ydb/core/grpc_services/rpc_login.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,15 @@ class TLoginRPC : public TRpcRequestActor<TLoginRPC, TEvLoginRequest, true> {
105105
}
106106
}
107107

108+
void HandleDestroyed(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
109+
ReplyErrorAndPassAway(Ydb::StatusIds::UNAVAILABLE, "SchemeShard is unavailable");
110+
}
111+
108112
STATEFN(StateWork) {
109113
switch (ev->GetTypeRewrite()) {
110114
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
111115
hFunc(TEvTabletPipe::TEvClientConnected, HandleConnect);
116+
hFunc(TEvTabletPipe::TEvClientDestroyed, HandleDestroyed);
112117
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNavigate);
113118
hFunc(TEvSchemeShard::TEvLoginResult, HandleResult);
114119
hFunc(TEvLdapAuthProvider::TEvAuthenticateResponse, Handle);

ydb/core/kafka_proxy/actors/actors.h

+1
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui
176176
const TActorId& discoveryCacheActor);
177177
NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
178178
NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie);
179+
NActors::IActor* CreateKafkaBalancerActor(const TContext::TPtr context, ui64 cookie);
179180
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);
180181
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message);
181182
NActors::IActor* CreateKafkaListOffsetsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TListOffsetsRequestData>& message);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
#include "kafka_balancer_actor.h"
2+
3+
namespace NKafka {
4+
5+
const TString INSERT_NEW_GROUP = R"sql(
6+
--!syntax_v1
7+
DECLARE $ConsumerGroup AS Utf8;
8+
DECLARE $Generation AS Uint64;
9+
DECLARE $State AS Uint64;
10+
DECLARE $Database AS Utf8;
11+
DECLARE $Master AS Utf8;
12+
DECLARE $LastMasterHeartbeat AS Datetime;
13+
DECLARE $ProtocolType AS Utf8;
14+
15+
INSERT INTO `%s`
16+
(
17+
consumer_group,
18+
generation,
19+
state,
20+
database,
21+
last_heartbeat_time,
22+
master,
23+
protocol_type
24+
)
25+
VALUES
26+
(
27+
$ConsumerGroup,
28+
$Generation,
29+
$State,
30+
$Database,
31+
$LastMasterHeartbeat,
32+
$Master,
33+
$ProtocolType
34+
);
35+
)sql";
36+
37+
const TString UPDATE_GROUP = R"sql(
38+
--!syntax_v1
39+
DECLARE $ConsumerGroup AS Utf8;
40+
DECLARE $State AS Uint64;
41+
DECLARE $Generation AS Uint64;
42+
DECLARE $Database AS Utf8;
43+
DECLARE $Master AS Utf8;
44+
DECLARE $LastMasterHeartbeat AS Datetime;
45+
46+
UPDATE `%s`
47+
SET
48+
state = $State,
49+
generation = $Generation,
50+
last_heartbeat_time = $LastMasterHeartbeat,
51+
master = $Master
52+
WHERE database = $Database
53+
AND consumer_group = $ConsumerGroup;
54+
)sql";
55+
56+
const TString UPDATE_GROUP_STATE = R"sql(
57+
--!syntax_v1
58+
DECLARE $ConsumerGroup AS Utf8;
59+
DECLARE $Database AS Utf8;
60+
DECLARE $State AS Uint64;
61+
DECLARE $Generation AS Uint64;
62+
63+
UPDATE `%s`
64+
SET
65+
state = $State
66+
WHERE database = $Database
67+
AND consumer_group = $ConsumerGroup
68+
AND generation = $Generation;
69+
)sql";
70+
71+
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
72+
--!syntax_v1
73+
DECLARE $ConsumerGroup AS Utf8;
74+
DECLARE $State AS Uint64;
75+
DECLARE $Database AS Utf8;
76+
DECLARE $Protocol AS Utf8;
77+
DECLARE $LastMasterHeartbeat AS Datetime;
78+
79+
UPDATE `%s`
80+
SET
81+
state = $State,
82+
last_heartbeat_time = $LastMasterHeartbeat,
83+
protocol = $Protocol
84+
WHERE database = $Database
85+
AND consumer_group = $ConsumerGroup;
86+
)sql";
87+
88+
const TString INSERT_MEMBER = R"sql(
89+
--!syntax_v1
90+
DECLARE $ConsumerGroup AS Utf8;
91+
DECLARE $Generation AS Uint64;
92+
DECLARE $MemberId AS Utf8;
93+
DECLARE $InstanceId AS Utf8;
94+
DECLARE $WorkerStateProto AS String;
95+
DECLARE $Database AS Utf8;
96+
DECLARE $HeartbeatDeadline AS Datetime;
97+
DECLARE $SessionTimeoutMs AS Uint32;
98+
DECLARE $RebalanceTimeoutMs AS Uint32;
99+
100+
INSERT INTO `%s`
101+
(
102+
consumer_group,
103+
generation,
104+
member_id,
105+
instance_id,
106+
heartbeat_deadline,
107+
worker_state_proto,
108+
database,
109+
session_timeout_ms,
110+
rebalance_timeout_ms
111+
)
112+
VALUES (
113+
$ConsumerGroup,
114+
$Generation,
115+
$MemberId,
116+
$InstanceId,
117+
$HeartbeatDeadline,
118+
$WorkerStateProto,
119+
$Database,
120+
$SessionTimeoutMs,
121+
$RebalanceTimeoutMs
122+
);
123+
)sql";
124+
125+
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"sql(
126+
--!syntax_v1
127+
DECLARE $Assignments AS List<Struct<MemberId: Utf8, Assignment: Bytes>>;
128+
DECLARE $ConsumerGroup AS Utf8;
129+
DECLARE $Database AS Utf8;
130+
DECLARE $Generation AS Uint64;
131+
DECLARE $State AS Uint64;
132+
DECLARE $LastMasterHeartbeat AS Datetime;
133+
134+
UPSERT INTO `%s`
135+
SELECT
136+
item.MemberId AS member_id,
137+
item.Assignment AS assignment,
138+
$ConsumerGroup AS consumer_group,
139+
$Database AS database,
140+
$Generation AS generation
141+
FROM AS_TABLE($Assignments) AS item;
142+
143+
UPDATE `%s`
144+
SET
145+
state = $State,
146+
last_heartbeat_time = $LastMasterHeartbeat,
147+
last_success_generation = $Generation
148+
WHERE database = $Database
149+
AND consumer_group = $ConsumerGroup;
150+
)sql";
151+
152+
const TString SELECT_ALIVE_MEMBERS = R"sql(
153+
--!syntax_v1
154+
DECLARE $ConsumerGroup AS Utf8;
155+
DECLARE $Generation AS Uint64;
156+
DECLARE $Database AS Utf8;
157+
DECLARE $PaginationMemberId AS Utf8;
158+
DECLARE $Limit AS Uint64;
159+
160+
SELECT member_id, instance_id, rebalance_timeout_ms
161+
FROM `%s`
162+
VIEW PRIMARY KEY
163+
WHERE database = $Database
164+
AND consumer_group = $ConsumerGroup
165+
AND generation = $Generation
166+
AND member_id > $PaginationMemberId
167+
AND (leaved IS NULL OR leaved = False)
168+
ORDER BY member_id
169+
LIMIT $Limit;
170+
)sql";
171+
172+
const TString SELECT_WORKER_STATES = R"sql(
173+
--!syntax_v1
174+
DECLARE $ConsumerGroup AS Utf8;
175+
DECLARE $Generation AS Uint64;
176+
DECLARE $Database AS Utf8;
177+
DECLARE $PaginationMemberId AS Utf8;
178+
DECLARE $Limit AS Uint64;
179+
180+
SELECT worker_state_proto, member_id, instance_id
181+
FROM `%s`
182+
VIEW PRIMARY KEY
183+
WHERE database = $Database
184+
AND consumer_group = $ConsumerGroup
185+
AND generation = $Generation
186+
AND member_id > $PaginationMemberId
187+
ORDER BY member_id
188+
LIMIT $Limit;
189+
)sql";
190+
191+
const TString CHECK_GROUP_STATE = R"sql(
192+
--!syntax_v1
193+
DECLARE $ConsumerGroup AS Utf8;
194+
DECLARE $Database AS Utf8;
195+
196+
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type, last_success_generation
197+
FROM `%s`
198+
VIEW PRIMARY KEY
199+
WHERE database = $Database
200+
AND consumer_group = $ConsumerGroup;
201+
)sql";
202+
203+
const TString FETCH_ASSIGNMENTS = R"sql(
204+
--!syntax_v1
205+
DECLARE $ConsumerGroup AS Utf8;
206+
DECLARE $Generation AS Uint64;
207+
DECLARE $MemberId AS Utf8;
208+
DECLARE $Database AS Utf8;
209+
210+
SELECT assignment
211+
FROM `%s`
212+
VIEW PRIMARY KEY
213+
WHERE database = $Database
214+
AND consumer_group = $ConsumerGroup
215+
AND generation = $Generation
216+
AND member_id = $MemberId;
217+
)sql";
218+
219+
const TString CHECK_DEAD_MEMBERS = R"sql(
220+
--!syntax_v1
221+
DECLARE $ConsumerGroup AS Utf8;
222+
DECLARE $Generation AS Uint64;
223+
DECLARE $Database AS Utf8;
224+
DECLARE $MemberId AS Utf8;
225+
DECLARE $Now AS Datetime;
226+
227+
SELECT COUNT(1) deads_cnt
228+
FROM `%s`
229+
VIEW idx_group_generation_db_hb
230+
WHERE database = $Database
231+
AND consumer_group = $ConsumerGroup
232+
AND generation = $Generation
233+
AND heartbeat_deadline < $Now;
234+
235+
SELECT session_timeout_ms
236+
FROM `%s`
237+
VIEW PRIMARY KEY
238+
WHERE database = $Database
239+
AND consumer_group = $ConsumerGroup
240+
AND generation = $Generation
241+
AND member_id = $MemberId;
242+
243+
)sql";
244+
245+
const TString UPDATE_LAST_MEMBER_AND_GROUP_HEARTBEATS = R"sql(
246+
--!syntax_v1
247+
DECLARE $ConsumerGroup AS Utf8;
248+
DECLARE $Generation AS Uint64;
249+
DECLARE $MemberId AS Utf8;
250+
DECLARE $Database AS Utf8;
251+
DECLARE $LastMasterHeartbeat AS Datetime;
252+
DECLARE $HeartbeatDeadline AS Datetime;
253+
DECLARE $UpdateGroupHeartbeat AS Bool;
254+
255+
UPDATE `%s`
256+
SET last_heartbeat_time = $LastMasterHeartbeat
257+
WHERE consumer_group = $ConsumerGroup
258+
AND database = $Database
259+
AND generation = $Generation
260+
AND $UpdateGroupHeartbeat = True;
261+
262+
UPDATE `%s`
263+
SET heartbeat_deadline = $HeartbeatDeadline
264+
WHERE consumer_group = $ConsumerGroup
265+
AND generation = $Generation
266+
AND member_id = $MemberId
267+
AND database = $Database;
268+
)sql";
269+
270+
const TString UPDATE_LAST_MEMBER_HEARTBEAT = R"sql(
271+
--!syntax_v1
272+
DECLARE $ConsumerGroup AS Utf8;
273+
DECLARE $Generation AS Uint64;
274+
DECLARE $MemberId AS Utf8;
275+
DECLARE $Database AS Utf8;
276+
DECLARE $HeartbeatDeadline AS Datetime;
277+
278+
UPDATE `%s`
279+
SET heartbeat_deadline = $HeartbeatDeadline
280+
WHERE consumer_group = $ConsumerGroup
281+
AND generation = $Generation
282+
AND member_id = $MemberId
283+
AND database = $Database;
284+
)sql";
285+
286+
const TString CHECK_MASTER_ALIVE = R"sql(
287+
--!syntax_v1
288+
DECLARE $ConsumerGroup AS Utf8;
289+
DECLARE $MasterId AS Utf8;
290+
DECLARE $Database AS Utf8;
291+
DECLARE $Generation AS Uint64;
292+
DECLARE $Now AS Datetime;
293+
294+
SELECT COUNT(1) allive,
295+
FROM `%s`
296+
VIEW PRIMARY KEY
297+
WHERE database = $Database
298+
AND consumer_group = $ConsumerGroup
299+
AND generation = $Generation
300+
AND member_id = $MasterId
301+
AND heartbeat_deadline > $Now;
302+
)sql";
303+
304+
const TString GET_GENERATION_BY_MEMBER = R"sql(
305+
--!syntax_v1
306+
DECLARE $ConsumerGroup AS Utf8;
307+
DECLARE $MemberId AS Utf8;
308+
DECLARE $Database AS Utf8;
309+
310+
SELECT generation
311+
FROM `%s`
312+
VIEW PRIMARY KEY
313+
WHERE database = $Database
314+
AND consumer_group = $ConsumerGroup
315+
AND member_id = $MemberId
316+
ORDER BY generation DESC
317+
LIMIT 1;
318+
)sql";
319+
320+
const TString UPDATE_LAST_HEARTBEAT_AND_STATE_TO_LEAVE_GROUP = R"sql(
321+
--!syntax_v1
322+
DECLARE $ConsumerGroup AS Utf8;
323+
DECLARE $MemberId AS Utf8;
324+
DECLARE $Database AS Utf8;
325+
DECLARE $Generation AS Uint64;
326+
DECLARE $LastMasterHeartbeat AS Datetime;
327+
DECLARE $State AS Uint64;
328+
DECLARE $UpdateState AS Bool;
329+
330+
UPDATE `%s`
331+
SET heartbeat_deadline = $LastMasterHeartbeat,
332+
leaved = True
333+
WHERE database = $Database
334+
AND consumer_group = $ConsumerGroup
335+
AND generation = $Generation
336+
AND member_id = $MemberId;
337+
338+
UPDATE `%s`
339+
SET
340+
state = $State
341+
WHERE database = $Database
342+
AND consumer_group = $ConsumerGroup
343+
AND $UpdateState = True;
344+
)sql";
345+
346+
const TString CHECK_GROUPS_COUNT = R"sql(
347+
--!syntax_v1
348+
DECLARE $GroupsCountCheckDeadline AS Datetime;
349+
350+
SELECT COUNT(1) as groups_count
351+
FROM `%s`
352+
VIEW idx_last_hb
353+
WHERE last_heartbeat_time > $GroupsCountCheckDeadline;
354+
)sql";
355+
356+
} // namespace NKafka

0 commit comments

Comments
 (0)