Skip to content

Commit e106f83

Browse files
authored
Merge b03fd8b into 1a7266c
2 parents 1a7266c + b03fd8b commit e106f83

File tree

5 files changed

+1221
-0
lines changed

5 files changed

+1221
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
#include "kafka_balancer_actor.h"
2+
3+
namespace NKafka {
4+
5+
// savnik add db
6+
7+
const TString SELECT_STATE_AND_GENERATION = R"(
8+
--!syntax_v1
9+
DECLARE $ConsumerGroup AS Utf8;
10+
11+
SELECT state, generation
12+
FROM kafka_connect_groups
13+
WHERE consumer_group = $ConsumerGroup;
14+
)";
15+
16+
const TString INSERT_NEW_GROUP = R"(
17+
--!syntax_v1
18+
DECLARE $ConsumerGroup AS Utf8;
19+
DECLARE $Generation AS Uint64;
20+
DECLARE $State AS Uint64;
21+
22+
INSERT INTO kafka_connect_groups
23+
(
24+
consumer_group,
25+
generation,
26+
state,
27+
current_generation_start_time
28+
)
29+
VALUES
30+
(
31+
$ConsumerGroup,
32+
$Generation,
33+
$State,
34+
CurrentUtcDateTime()
35+
);
36+
)";
37+
38+
const TString UPDATE_GROUP = R"(
39+
--!syntax_v1
40+
DECLARE $ConsumerGroup AS Utf8;
41+
DECLARE $NewState AS Uint64;
42+
DECLARE $OldGeneration AS Uint64;
43+
44+
UPDATE kafka_connect_groups
45+
SET
46+
state = $NewState,
47+
generation = $OldGeneration + 1
48+
WHERE consumer_group = $ConsumerGroup;
49+
)";
50+
51+
const TString SELECT_MASTER = R"(
52+
--!syntax_v1
53+
DECLARE $ConsumerGroup AS Utf8;
54+
DECLARE $Generation AS Uint64;
55+
56+
SELECT member_id
57+
FROM kafka_connect_members
58+
WHERE consumer_group = $ConsumerGroup
59+
AND generation = $Generation
60+
ORDER BY join_time
61+
LIMIT 1;
62+
)";
63+
64+
const TString INSERT_MEMBER_AND_SELECT_MASTER = R"(
65+
--!syntax_v1
66+
DECLARE $ConsumerGroup AS Utf8;
67+
DECLARE $Generation AS Uint64;
68+
DECLARE $MemberId AS Utf8;
69+
DECLARE $WorkerState AS String;
70+
71+
INSERT INTO kafka_connect_members (
72+
consumer_group,
73+
generation,
74+
member_id,
75+
join_time,
76+
hearbeat_deadline,
77+
worker_state
78+
)
79+
VALUES (
80+
$ConsumerGroup,
81+
$Generation,
82+
$MemberId,
83+
CurrentUtcDateTime(),
84+
CurrentUtcDateTime() + Interval("PT5S"),
85+
$WorkerState
86+
);
87+
88+
SELECT member_id AS master_id
89+
FROM kafka_connect_members
90+
WHERE consumer_group = $ConsumerGroup
91+
AND generation = $Generation
92+
ORDER BY join_time
93+
LIMIT 1;
94+
)";
95+
96+
97+
// savnik Леша говорил про пагинацию
98+
99+
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"(
100+
--!syntax_v1
101+
DECLARE $Assignments AS List<Struct<MemberId: Utf8, Assignment: Bytes>>;
102+
DECLARE $ConsumerGroup AS Utf8;
103+
104+
UPSERT INTO kafka_connect_members
105+
SELECT
106+
item.MemberId AS member_id,
107+
item.Assignment AS assignment,
108+
$ConsumerGroup AS consumer_group
109+
FROM AS_TABLE($Assignments) AS item;
110+
111+
UPDATE kafka_connect_groups
112+
SET state = 2
113+
WHERE consumer_group = $ConsumerGroup;
114+
115+
)";
116+
117+
const TString UPDATE_GROUPS_AND_SELECT_WORKER_STATES = R"(
118+
--!syntax_v1
119+
DECLARE $ConsumerGroup AS Utf8;
120+
DECLARE $State AS Uint64;
121+
DECLARE $Generation AS Uint64;
122+
123+
UPDATE kafka_connect_groups
124+
SET state = $State
125+
WHERE consumer_group = $ConsumerGroup;
126+
127+
SELECT worker_state
128+
FROM kafka_connect_members
129+
WHERE consumer_group = $ConsumerGroup
130+
AND generation = $Generation;
131+
)";
132+
133+
const TString CHECK_GROUP_STATE = R"(
134+
--!syntax_v1
135+
DECLARE $ConsumerGroup AS Utf8;
136+
137+
SELECT state, generation
138+
FROM kafka_connect_groups
139+
WHERE consumer_group = $ConsumerGroup;
140+
)";
141+
142+
const TString FETCH_ASSIGNMENT = R"(
143+
--!syntax_v1
144+
DECLARE $ConsumerGroup AS Utf8;
145+
DECLARE $Generation AS Uint64;
146+
DECLARE $MemberId AS Utf8;
147+
148+
SELECT assignment
149+
FROM kafka_connect_members
150+
WHERE consumer_group = $ConsumerGroup
151+
AND generation = $Generation
152+
AND member_id = $MemberId;
153+
)";
154+
155+
} // namespace NKafka

0 commit comments

Comments
 (0)