Skip to content

Commit ccba17a

Browse files
committed
move db and timestamps to variables
1 parent 48b56a4 commit ccba17a

File tree

3 files changed

+81
-74
lines changed

3 files changed

+81
-74
lines changed

ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp

+54-48
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22

33
namespace NKafka {
44

5-
const TString INSERT_NEW_GROUP = R"(
5+
const TString INSERT_NEW_GROUP = R"sql(
66
--!syntax_v1
77
DECLARE $ConsumerGroup AS Utf8;
88
DECLARE $Generation AS Uint64;
99
DECLARE $State AS Uint64;
1010
DECLARE $Database AS Utf8;
1111
DECLARE $Master AS Utf8;
12+
DECLARE $LastHeartbeat AS Datetime;
1213
13-
INSERT INTO `/Root/.metadata/kafka_consumer_groups`
14+
INSERT INTO `%s`
1415
(
1516
consumer_group,
1617
generation,
@@ -25,54 +26,58 @@ const TString INSERT_NEW_GROUP = R"(
2526
$Generation,
2627
$State,
2728
$Database,
28-
CurrentUtcDateTime(),
29+
$LastHeartbeat,
2930
$Master
3031
);
31-
)";
32+
)sql";
3233

33-
const TString UPDATE_GROUP = R"(
34+
const TString UPDATE_GROUP = R"sql(
3435
--!syntax_v1
3536
DECLARE $ConsumerGroup AS Utf8;
3637
DECLARE $State AS Uint64;
3738
DECLARE $Generation AS Uint64;
3839
DECLARE $Database AS Utf8;
3940
DECLARE $Master AS Utf8;
41+
DECLARE $LastHeartbeat AS Datetime;
4042
41-
UPDATE `/Root/.metadata/kafka_consumer_groups`
43+
UPDATE `%s`
4244
SET
4345
state = $State,
4446
generation = $Generation,
45-
last_heartbeat_time = CurrentUtcDateTime(),
47+
last_heartbeat_time = $LastHeartbeat,
4648
master = $Master
4749
WHERE consumer_group = $ConsumerGroup
4850
AND database = $Database;
49-
)";
51+
)sql";
5052

51-
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"(
53+
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql(
5254
--!syntax_v1
5355
DECLARE $ConsumerGroup AS Utf8;
5456
DECLARE $State AS Uint64;
5557
DECLARE $Database AS Utf8;
5658
DECLARE $Protocol AS Utf8;
59+
DECLARE $LastHeartbeat AS Datetime;
5760
58-
UPDATE `/Root/.metadata/kafka_consumer_groups`
61+
UPDATE `%s`
5962
SET
6063
state = $State,
61-
last_heartbeat_time = CurrentUtcDateTime(),
64+
last_heartbeat_time = $LastHeartbeat,
6265
protocol = $Protocol
6366
WHERE consumer_group = $ConsumerGroup
6467
AND database = $Database;
65-
)";
68+
)sql";
6669

67-
const TString INSERT_MEMBER = R"(
70+
const TString INSERT_MEMBER = R"sql(
6871
--!syntax_v1
6972
DECLARE $ConsumerGroup AS Utf8;
7073
DECLARE $Generation AS Uint64;
7174
DECLARE $MemberId AS Utf8;
7275
DECLARE $WorkerStateProto AS String;
7376
DECLARE $Database AS Utf8;
77+
DECLARE $LastHeartbeat AS Datetime;
7478
75-
INSERT INTO `/Root/.metadata/kafka_consumer_members` (
79+
INSERT INTO `%s`
80+
(
7681
consumer_group,
7782
generation,
7883
member_id,
@@ -84,21 +89,22 @@ const TString INSERT_MEMBER = R"(
8489
$ConsumerGroup,
8590
$Generation,
8691
$MemberId,
87-
CurrentUtcDateTime(),
92+
$LastHeartbeat,
8893
$WorkerStateProto,
8994
$Database
9095
);
91-
)";
96+
)sql";
9297

93-
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"(
98+
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"sql(
9499
--!syntax_v1
95100
DECLARE $Assignments AS List<Struct<MemberId: Utf8, Assignment: Bytes>>;
96101
DECLARE $ConsumerGroup AS Utf8;
97102
DECLARE $Database AS Utf8;
98103
DECLARE $Generation AS Uint64;
99104
DECLARE $State AS Uint64;
105+
DECLARE $LastHeartbeat AS Datetime;
100106
101-
UPSERT INTO `/Root/.metadata/kafka_consumer_members`
107+
UPSERT INTO `%s`
102108
SELECT
103109
item.MemberId AS member_id,
104110
item.Assignment AS assignment,
@@ -107,115 +113,115 @@ const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"(
107113
$Generation AS generation
108114
FROM AS_TABLE($Assignments) AS item;
109115
110-
UPDATE `/Root/.metadata/kafka_consumer_groups`
116+
UPDATE `%s`
111117
SET
112118
state = $State,
113-
last_heartbeat_time = CurrentUtcDateTime()
119+
last_heartbeat_time = $LastHeartbeat
114120
WHERE consumer_group = $ConsumerGroup
115121
AND database = $Database;
116-
)";
122+
)sql";
117123

118-
const TString UPDATE_GROUPS_AND_SELECT_WORKER_STATES = R"(
124+
const TString UPDATE_GROUPS_AND_SELECT_WORKER_STATES = R"sql(
119125
--!syntax_v1
120126
DECLARE $ConsumerGroup AS Utf8;
121127
DECLARE $State AS Uint64;
122128
DECLARE $Generation AS Uint64;
123129
DECLARE $Database AS Utf8;
130+
DECLARE $LastHeartbeat AS Datetime;
124131
125-
UPDATE `/Root/.metadata/kafka_consumer_groups`
132+
UPDATE `%s`
126133
SET
127134
state = $State,
128-
last_heartbeat_time = CurrentUtcDateTime()
135+
last_heartbeat_time = $LastHeartbeat
129136
WHERE consumer_group = $ConsumerGroup
130137
AND database = $Database;
131138
132139
SELECT worker_state_proto, member_id
133-
FROM `/Root/.metadata/kafka_consumer_members`
140+
FROM `%s`
134141
WHERE consumer_group = $ConsumerGroup
135142
AND generation = $Generation
136143
AND database = $Database;
137-
)";
144+
)sql";
138145

139-
const TString CHECK_GROUP_STATE = R"(
146+
const TString CHECK_GROUP_STATE = R"sql(
140147
--!syntax_v1
141148
DECLARE $ConsumerGroup AS Utf8;
142149
DECLARE $Database AS Utf8;
143150
144151
SELECT state, generation, master, last_heartbeat_time, consumer_group, database
145-
FROM `/Root/.metadata/kafka_consumer_groups`
152+
FROM `%s`
146153
WHERE consumer_group = $ConsumerGroup
147154
AND database = $Database;
148-
)";
155+
)sql";
149156

150-
const TString FETCH_ASSIGNMENTS = R"(
157+
const TString FETCH_ASSIGNMENTS = R"sql(
151158
--!syntax_v1
152159
DECLARE $ConsumerGroup AS Utf8;
153160
DECLARE $Generation AS Uint64;
154161
DECLARE $MemberId AS Utf8;
155162
DECLARE $Database AS Utf8;
156163
157164
SELECT assignment
158-
FROM `/Root/.metadata/kafka_consumer_members`
165+
FROM `%s`
159166
WHERE consumer_group = $ConsumerGroup
160167
AND generation = $Generation
161168
AND member_id = $MemberId
162169
AND database = $Database;
163-
)";
170+
)sql";
164171

165-
const TString CHECK_DEAD_MEMBERS = R"(
172+
const TString CHECK_DEAD_MEMBERS = R"sql(
166173
--!syntax_v1
167174
DECLARE $ConsumerGroup AS Utf8;
168175
DECLARE $Generation AS Uint64;
169176
DECLARE $Database AS Utf8;
170177
DECLARE $Deadline AS Datetime;
171178
172179
SELECT COUNT(1) as cnt
173-
FROM `/Root/.metadata/kafka_consumer_members`
180+
FROM `%s`
174181
WHERE consumer_group = $ConsumerGroup
175182
AND generation = $Generation
176183
AND database = $Database
177184
AND last_heartbeat_time < $Deadline;
178-
)";
185+
)sql";
179186

180-
const TString UPDATE_TTLS = R"(
187+
const TString UPDATE_TTLS = R"sql(
181188
--!syntax_v1
182189
DECLARE $ConsumerGroup AS Utf8;
183190
DECLARE $Generation AS Uint64;
184191
DECLARE $MemberId AS Utf8;
185192
DECLARE $Database AS Utf8;
186-
DECLARE $HeartbeatDeadline AS Datetime;
193+
DECLARE $LastHeartbeat AS Datetime;
187194
DECLARE $UpdateGroupHeartbeat AS Bool;
188195
189-
UPDATE `/Root/.metadata/kafka_consumer_groups`
190-
SET last_heartbeat_time = CurrentUtcDateTime()
196+
UPDATE `%s`
197+
SET last_heartbeat_time = $LastHeartbeat
191198
WHERE consumer_group = $ConsumerGroup
192199
AND database = $Database
193200
AND $UpdateGroupHeartbeat = True;
194201
195-
UPDATE `/Root/.metadata/kafka_consumer_members`
196-
SET last_heartbeat_time = $HeartbeatDeadline
202+
UPDATE `%s`
203+
SET last_heartbeat_time = $LastHeartbeat
197204
WHERE consumer_group = $ConsumerGroup
198205
AND generation = $Generation
199206
AND member_id = $MemberId
200207
AND database = $Database;
201-
)";
208+
)sql";
202209

203210

204-
const TString UPDATE_TTL_LEAVE_GROUP = R"(
211+
const TString UPDATE_TTL_LEAVE_GROUP = R"sql(
205212
--!syntax_v1
206213
DECLARE $ConsumerGroup AS Utf8;
207214
DECLARE $MemberId AS Utf8;
208215
DECLARE $Database AS Utf8;
216+
DECLARE $LastHeartbeat AS Datetime;
209217
210-
UPDATE `/Root/.metadata/kafka_consumer_members`
211-
SET last_heartbeat_time = CurrentUtcDateTime() - Interval("PT1H")
218+
UPDATE `%s`
219+
SET last_heartbeat_time = $LastHeartbeat
212220
WHERE consumer_group = $ConsumerGroup
213221
AND member_id = $MemberId
214222
AND database = $Database;
215-
)";
216-
223+
)sql";
217224

218225
} // namespace NKafka
219226

220-
221227
// savnik check max members count

0 commit comments

Comments
 (0)