Skip to content

Commit 45dd216

Browse files
authored
html app (#7118)
1 parent 18790fe commit 45dd216

File tree

7 files changed

+173
-59
lines changed

7 files changed

+173
-59
lines changed

ydb/core/persqueue/read_balancer.cpp

Lines changed: 12 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ NKikimrPQ::EConsumerScalingSupport DefaultScalingSupport() {
2727
: NKikimrPQ::EConsumerScalingSupport::NOT_SUPPORT;
2828
}
2929

30+
TString EncodeAnchor(const TString& v) {
31+
auto r = Base64Encode(v);
32+
while (r.EndsWith('=')) {
33+
r.resize(r.size() - 1);
34+
}
35+
return r;
36+
}
37+
3038
TPersQueueReadBalancer::TPersQueueReadBalancer(const TActorId &tablet, TTabletStorageInfo *info)
3139
: TActor(&TThis::StateInit)
3240
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
@@ -211,7 +219,7 @@ TString TPersQueueReadBalancer::GenerateStat() {
211219
}
212220
for (auto& consumer : balancerStatistcs.Consumers) {
213221
LI() {
214-
str << "<a href=\"#client_" << Base64Encode(consumer.ConsumerName) << "\" data-toggle=\"tab\">" << NPersQueue::ConvertOldConsumerName(consumer.ConsumerName) << "</a>";
222+
str << "<a href=\"#c_" << EncodeAnchor(consumer.ConsumerName) << "\" data-toggle=\"tab\">" << NPersQueue::ConvertOldConsumerName(consumer.ConsumerName) << "</a>";
215223
}
216224
}
217225
}
@@ -221,74 +229,21 @@ TString TPersQueueReadBalancer::GenerateStat() {
221229
TABLEHEAD() {
222230
TABLER() {
223231
TABLEH() {str << "partition";}
224-
TABLEH() {str << "tabletId";}
232+
TABLEH() { str << "tabletId";}
225233
}
226234
}
227235
TABLEBODY() {
228236
for (auto& p : PartitionsInfo) {
229237
TABLER() {
230238
TABLED() { str << p.first;}
231-
TABLED() { str << p.second.TabletId;}
239+
TABLED() { HREF(TStringBuilder() << "?TabletID=" << p.second.TabletId) { str << p.second.TabletId; } }
232240
}
233241
}
234242
}
235243
}
236244
}
237-
for (auto& consumer : balancerStatistcs.Consumers) {
238-
DIV_CLASS_ID("tab-pane fade", "client_" + Base64Encode(consumer.ConsumerName)) {
239-
TABLE_SORTABLE_CLASS("table") {
240-
TABLEHEAD() {
241-
TABLER() {
242-
TABLEH() {str << "partition";}
243-
TABLEH() {str << "tabletId";}
244-
TABLEH() {str << "state";}
245-
TABLEH() {str << "session";}
246-
}
247-
}
248-
TABLEBODY() {
249-
for (auto& partition : consumer.Partitions) {
250-
TABLER() {
251-
TABLED() { str << partition.PartitionId;}
252-
TABLED() { str << partition.TabletId;}
253-
TABLED() { str << partition.State;}
254-
TABLED() { str << partition.Session;}
255-
}
256-
}
257-
}
258-
}
259245

260-
TABLE_SORTABLE_CLASS("table") {
261-
TABLEHEAD() {
262-
TABLER() {
263-
TABLEH() {str << "session";}
264-
TABLEH() {str << "suspended partitions";}
265-
TABLEH() {str << "active partitions";}
266-
TABLEH() {str << "inactive partitions";}
267-
TABLEH() {str << "total partitions";}
268-
}
269-
}
270-
TABLEBODY() {
271-
272-
for (auto& session : balancerStatistcs.Sessions) {
273-
TABLER() {
274-
TABLED() { str << session.Session;}
275-
TABLED() { str << session.SuspendedPartitionCount;}
276-
TABLED() { str << session.ActivePartitionCount;}
277-
TABLED() { str << session.InactivePartitionCount;}
278-
TABLED() { str << session.TotalPartitionCount;}
279-
}
280-
}
281-
282-
TABLER() {
283-
TABLED() { str << "FREE";}
284-
TABLED() { str << 0;}
285-
TABLED() { str << balancerStatistcs.FreePartitions;}
286-
TABLED() { str << balancerStatistcs.FreePartitions;}
287-
}
288-
}
289-
}
290-
}
291-
}
246+
Balancer->RenderApp(str);
292247
}
293248
}
294249
return str.Str();

ydb/core/persqueue/read_balancer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,8 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
353353

354354
};
355355

356+
TString EncodeAnchor(const TString& value);
357+
356358
NKikimrPQ::EConsumerScalingSupport DefaultScalingSupport();
357359

358360
}

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,12 +1094,14 @@ void TConsumer::FinishReading(TEvPersQueue::TEvReadingPartitionFinishedRequest::
10941094
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
10951095
GetPrefix() << "Reading of the partition " << partitionId << " was finished by " << ConsumerName
10961096
<< " but the partition hasn't family");
1097+
return;
10971098
}
10981099

10991100
if (!family->Session) {
11001101
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
11011102
GetPrefix() << "Reading of the partition " << partitionId << " was finished by " << ConsumerName
11021103
<< " but the partition hasn't reading session");
1104+
return;
11031105
}
11041106

11051107
auto& partition = Partitions[partitionId];

ydb/core/persqueue/read_balancer__balancing.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ class TBalancer {
351351
void Handle(TEvPQ::TEvBalanceConsumer::TPtr& ev, const TActorContext& ctx);
352352
void Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx);
353353

354+
void RenderApp(TStringStream& str) const;
355+
354356
private:
355357
TString GetPrefix() const;
356358
ui32 NextStep();
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#include "read_balancer__balancing.h"
2+
3+
#include <library/cpp/monlib/service/pages/templates.h>
4+
5+
#define DEBUG(message)
6+
7+
8+
namespace NKikimr::NPQ::NBalancing {
9+
10+
void TBalancer::RenderApp(TStringStream& str) const {
11+
auto& __stream = str;
12+
13+
for (auto& [consumerName, consumer] : Consumers) {
14+
auto consumerAnchor = "c_" + EncodeAnchor(consumerName);
15+
16+
auto familyAnchor = [&](const size_t familyId) {
17+
return TStringBuilder() << consumerAnchor << "_F" << familyId;
18+
};
19+
auto partitionAnchor = [&](const ui32 partitionId) {
20+
return TStringBuilder() << consumerAnchor << "_P" << partitionId;
21+
};
22+
23+
TAG(TH3) { str << "Families"; }
24+
DIV_CLASS_ID("tab-pane fade", consumerAnchor) {
25+
TABLE_CLASS("table") {
26+
TABLEHEAD() {
27+
TABLER() {
28+
TABLEH() { str << "Id"; }
29+
TABLEH() { str << "Status"; }
30+
TABLEH() { str << "Partitions"; }
31+
TABLEH() { str << "Session"; }
32+
}
33+
}
34+
35+
TABLEBODY() {
36+
for (auto& [familyId, family] : consumer->Families) {
37+
TABLER() {
38+
TABLED() { DIV_CLASS_ID("text-info", familyAnchor(familyId)) { str << familyId; } }
39+
TABLED() { str << family->Status; }
40+
TABLED() {
41+
for (auto partitionId : family->Partitions) {
42+
HREF("#" + partitionAnchor(partitionId)) { str << partitionId; }
43+
str << ", ";
44+
}
45+
}
46+
TABLED() { str << (family->Session ? family->Session->SessionName : ""); }
47+
}
48+
}
49+
}
50+
}
51+
52+
TAG(TH3) { str << "Partitions"; }
53+
TABLE_CLASS("table") {
54+
TABLEHEAD() {
55+
TABLER() {
56+
TABLEH() { str << "Id"; }
57+
TABLEH() { str << "Family"; }
58+
TABLEH() { str << "Status"; };
59+
TABLEH() { str << "Parents"; }
60+
TABLEH() { str << "Commited"; }
61+
TABLEH() { str << "Reading finished"; }
62+
TABLEH() { str << "Scale aware SDK"; }
63+
TABLEH() { str << "Read from end"; }
64+
TABLEH() { str << "Iteration"; }
65+
TABLEH() { str << "P Generation"; }
66+
TABLEH() { str << "P Cookie"; }
67+
}
68+
69+
TABLEBODY() {
70+
for (auto& [partitionId, partition] : consumer->Partitions) {
71+
const auto* family = consumer->FindFamily(partitionId);
72+
const auto* node = consumer->GetPartitionGraph().GetPartition(partitionId);
73+
TString style = node && node->Children.empty() ? "text-success" : "text-muted";
74+
TString status = "Free";
75+
if (family) {
76+
status = partition.IsInactive() ? "Finished" : "Read";
77+
} else if (consumer->IsReadable(partitionId)) {
78+
status = "Ready";
79+
}
80+
81+
TABLER() {
82+
TABLED() { DIV_CLASS_ID(style, partitionAnchor(partitionId)) { str << partitionId; } }
83+
TABLED() {
84+
if (family) {
85+
HREF("#" + familyAnchor(family->Id)) { str << family->Id; }
86+
}
87+
}
88+
TABLED() { str << status; }
89+
TABLED() {
90+
if (node) {
91+
for (auto* parent : node->Parents) {
92+
HREF("#" + partitionAnchor(parent->Id)) { str << parent->Id; }
93+
str << ", ";
94+
}
95+
} else {
96+
str << "error: not found";
97+
}
98+
}
99+
TABLED() { str << partition.Commited; }
100+
TABLED() { str << partition.ReadingFinished; }
101+
TABLED() { str << partition.ScaleAwareSDK; }
102+
TABLED() { str << partition.StartedReadingFromEndOffset; }
103+
TABLED() { str << partition.Iteration; }
104+
TABLED() { str << partition.PartitionGeneration; }
105+
TABLED() { str << partition.PartitionCookie; }
106+
}
107+
}
108+
}
109+
}
110+
}
111+
/*
112+
TABLE_SORTABLE_CLASS("table") {
113+
TABLEHEAD() {
114+
TABLER() {
115+
TABLEH() { str << "session"; }
116+
TABLEH() { str << "suspended partitions"; }
117+
TABLEH() { str << "active partitions"; }
118+
TABLEH() { str << "inactive partitions"; }
119+
TABLEH() { str << "total partitions"; }
120+
}
121+
}
122+
TABLEBODY() {
123+
124+
for (auto& session : balancerStatistcs.Sessions) {
125+
TABLER() {
126+
TABLED() { str << session.Session; }
127+
TABLED() { str << session.SuspendedPartitionCount; }
128+
TABLED() { str << session.ActivePartitionCount; }
129+
TABLED() { str << session.InactivePartitionCount; }
130+
TABLED() { str << session.TotalPartitionCount; }
131+
}
132+
}
133+
134+
TABLER() {
135+
TABLED() { str << "FREE"; }
136+
TABLED() { str << 0; }
137+
TABLED() { str << balancerStatistcs.FreePartitions; }
138+
TABLED() { str << balancerStatistcs.FreePartitions; }
139+
}
140+
}
141+
}
142+
*/
143+
}
144+
}
145+
}
146+
147+
}

ydb/core/persqueue/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ SRCS(
2929
pq_l2_cache.cpp
3030
pq_rl_helpers.cpp
3131
quota_tracker.cpp
32+
read_balancer__balancing_app.cpp
3233
read_balancer__balancing.cpp
3334
read_balancer.cpp
3435
account_read_quoter.cpp

ydb/services/persqueue_v1/actors/partition_actor.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -997,6 +997,10 @@ void TPartitionActor::WaitDataInPartition(const TActorContext& ctx) {
997997
return;
998998
}
999999

1000+
if (ReadingFinishedSent) {
1001+
return;
1002+
}
1003+
10001004
Y_ABORT_UNLESS(InitDone);
10011005
Y_ABORT_UNLESS(PipeClient);
10021006
Y_ABORT_UNLESS(ReadOffset >= EndOffset);
@@ -1069,7 +1073,6 @@ void TPartitionActor::Handle(TEvPersQueue::TEvHasDataInfoResponse::TPtr& ev, con
10691073
childPartitionIds.reserve(record.GetChildPartitionIds().size());
10701074
childPartitionIds.insert(childPartitionIds.end(), record.GetChildPartitionIds().begin(), record.GetChildPartitionIds().end());
10711075

1072-
// TODO Tx
10731076
ctx.Send(ParentId, new TEvPQProxy::TEvReadingFinished(Topic->GetInternalName(), Partition.Partition, FirstRead,
10741077
std::move(adjacentPartitionIds), std::move(childPartitionIds)));
10751078
} else if (FirstRead) {
@@ -1218,7 +1221,9 @@ void TPartitionActor::Handle(TEvPQProxy::TEvDeadlineExceeded::TPtr& ev, const TA
12181221

12191222
void TPartitionActor::HandleWakeup(const TActorContext& ctx) {
12201223
DoWakeup(ctx);
1221-
ctx.Schedule(PREWAIT_DATA, new TEvents::TEvWakeup());
1224+
if (!ReadingFinishedSent) {
1225+
ctx.Schedule(PREWAIT_DATA, new TEvents::TEvWakeup());
1226+
}
12221227
}
12231228

12241229
void TPartitionActor::DoWakeup(const TActorContext& ctx) {

0 commit comments

Comments
 (0)