Skip to content

Commit 9389405

Browse files
author
tesseract
committed
Topic storage size metrics #2
1 parent e067ade commit 9389405

16 files changed

+109
-20
lines changed

ydb/core/persqueue/partition.cpp

+37-9
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,23 @@ ui64 TPartition::MeteringDataSize(const TActorContext& ctx) const {
181181
return size;
182182
}
183183

184+
ui64 TPartition::ReserveSize() const {
185+
return TopicPartitionReserveSize(Config);
186+
}
187+
188+
ui64 TPartition::StorageSize(const TActorContext& ctx) const {
189+
return std::max<ui64>(MeteringDataSize(ctx), ReserveSize());
190+
}
191+
192+
ui64 TPartition::UsedReserveSize(const TActorContext& ctx) const {
193+
return std::min<ui64>(MeteringDataSize(ctx), ReserveSize());
194+
}
195+
196+
184197
ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
185-
auto duration = ctx.Now() - LastUsedStorageMeterTimestamp;
186-
LastUsedStorageMeterTimestamp = ctx.Now();
198+
const auto now = ctx.Now();
199+
const auto duration = now - LastUsedStorageMeterTimestamp;
200+
LastUsedStorageMeterTimestamp = now;
187201
ui64 size = MeteringDataSize(ctx);
188202
return size * duration.MilliSeconds() / 1000 / 1_MB; // mb*seconds
189203
}
@@ -203,7 +217,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
203217

204218
ProcessHasDataRequests(ctx);
205219

206-
auto now = ctx.Now();
220+
const auto now = ctx.Now();
207221
for (auto& userInfo : UsersInfoStorage->GetAll()) {
208222
userInfo.second.UpdateReadingTimeAndState(now);
209223
for (auto& avg : userInfo.second.AvgReadBytes) {
@@ -237,8 +251,8 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
237251
}
238252

239253
if (haveChanges) {
240-
WriteCycleStartTime = ctx.Now();
241-
WriteStartTime = ctx.Now();
254+
WriteCycleStartTime = now;
255+
WriteStartTime = now;
242256
TopicQuotaWaitTimeForCurrentBlob = TDuration::Zero();
243257
WritesTotal.Inc();
244258
Become(&TThis::StateWrite);
@@ -1178,11 +1192,25 @@ bool TPartition::UpdateCounters(const TActorContext& ctx) {
11781192
}
11791193
}
11801194

1181-
ui64 partSize = Size();
1182-
if (partSize != PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) {
1195+
ui64 storageSize = StorageSize(ctx);
1196+
if (storageSize != PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) {
11831197
haveChanges = true;
1184-
PartitionCountersLabeled->GetCounters()[METRIC_MAX_PART_SIZE].Set(partSize);
1185-
PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Set(partSize);
1198+
PartitionCountersLabeled->GetCounters()[METRIC_MAX_PART_SIZE].Set(storageSize);
1199+
PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Set(storageSize);
1200+
}
1201+
1202+
if (NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY == Config.GetMeteringMode()) {
1203+
ui64 reserveSize = ReserveSize();
1204+
if (reserveSize != PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_LIMIT_BYTES].Get()) {
1205+
haveChanges = true;
1206+
PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_LIMIT_BYTES].Set(reserveSize);
1207+
}
1208+
1209+
ui64 reserveUsed = UsedReserveSize(ctx);
1210+
if (reserveUsed != PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_USED_BYTES].Get()) {
1211+
haveChanges = true;
1212+
PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_USED_BYTES].Set(reserveUsed);
1213+
}
11861214
}
11871215

11881216
ui64 ts = (WriteTimestamp.MilliSeconds() < MIN_TIMESTAMP_MS) ? Max<i64>() : WriteTimestamp.MilliSeconds();

ydb/core/persqueue/partition.h

+3-8
Original file line numberDiff line numberDiff line change
@@ -338,14 +338,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
338338
}
339339

340340
ui64 MeteringDataSize(const TActorContext& ctx) const;
341-
342-
ui64 UsedReserveSize(const TActorContext& ctx) const {
343-
return std::min<ui64>(MeteringDataSize(ctx), ReserveSize());
344-
}
345-
346-
ui64 ReserveSize() const {
347-
return TopicPartitionReserveSize(Config);
348-
}
341+
ui64 ReserveSize() const;
342+
ui64 StorageSize(const TActorContext& ctx) const;
343+
ui64 UsedReserveSize(const TActorContext& ctx) const;
349344

350345

351346
//Bootstrap sends kvRead

ydb/core/persqueue/ut/common/pq_ut_common.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
6767
tabletConfig->SetLocalDC(parameters.localDC);
6868
tabletConfig->AddReadRules("user");
6969
tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs);
70+
tabletConfig->SetMeteringMode(parameters.meteringMode);
7071
auto config = tabletConfig->MutablePartitionConfig();
7172
if (parameters.speed > 0) {
7273
config->SetWriteSpeedInBytesPerSecond(parameters.speed);

ydb/core/persqueue/ut/common/pq_ut_common.h

+1
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ struct TTabletPreparationParameters {
260260
TString databaseId{"PQ"};
261261
TString databasePath{"/Root/PQ"};
262262
TString account{"federationAccount"};
263+
::NKikimrPQ::TPQTabletConfig_EMeteringMode meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY;
263264
};
264265
void PQTabletPrepare(
265266
const TTabletPreparationParameters& parameters,

ydb/core/persqueue/ut/counters_ut.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ void CompareJsons(const TString& inputStr, const TString& referenceStr) {
149149
NJson::TJsonValue inputJson;
150150
UNIT_ASSERT(NJson::ReadJsonTree(TStringBuf(inputStr), &inputJson));
151151

152+
Cerr << "Expected: " << referenceStr << Endl;
153+
Cerr << "Result: " << inputStr << Endl;
154+
152155
// Run time of test differs as well as counters below.
153156
// We set it to 5000 and then compare with reference string.
154157
auto getByPath = [](const NJson::TJsonValue& msg, TStringBuf path) {
@@ -261,7 +264,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
261264
return TTestActorRuntime::DefaultObserverFunc(runtime, event);
262265
});
263266

264-
PQTabletPrepare({}, {{"client", true}}, tc);
267+
PQTabletPrepare({.deleteTime=3600, .meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS}, {{"client", true}}, tc);
265268
TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()};
266269
ui64 ssId = 325;
267270
BootFakeSchemeShard(*tc.Runtime, ssId, state);

ydb/core/persqueue/ut/resources/counters_labeled.json

+36
Original file line numberDiff line numberDiff line change
@@ -1812,6 +1812,24 @@
18121812
},
18131813
"value": 540
18141814
},
1815+
{
1816+
"kind": "GAUGE",
1817+
"labels": {
1818+
"user_counters": "PersQueue",
1819+
"topic": "rt3.dc1--asdfgs--topic",
1820+
"sensor": "PQ/ReserveLimitBytes"
1821+
},
1822+
"value": 0
1823+
},
1824+
{
1825+
"kind": "GAUGE",
1826+
"labels": {
1827+
"user_counters": "PersQueue",
1828+
"topic": "rt3.dc1--asdfgs--topic",
1829+
"sensor": "PQ/ReserveUsedBytes"
1830+
},
1831+
"value": 0
1832+
},
18151833
{
18161834
"kind": "GAUGE",
18171835
"labels": {
@@ -2118,6 +2136,24 @@
21182136
},
21192137
"value": 540
21202138
},
2139+
{
2140+
"kind": "GAUGE",
2141+
"labels": {
2142+
"user_counters": "PersQueue",
2143+
"topic": "total",
2144+
"sensor": "PQ/ReserveLimitBytes"
2145+
},
2146+
"value": 0
2147+
},
2148+
{
2149+
"kind": "GAUGE",
2150+
"labels": {
2151+
"user_counters": "PersQueue",
2152+
"topic": "total",
2153+
"sensor": "PQ/ReserveUsedBytes"
2154+
},
2155+
"value": 0
2156+
},
21212157
{
21222158
"kind": "GAUGE",
21232159
"labels": {

ydb/core/persqueue/ut/resources/counters_topics.html

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
name=topic.partition.write.speed_limit_bytes_per_second: 50000000
2525
name=topic.partition.write.throttled_nanoseconds_max: 0
2626
name=topic.producers_count: 3
27+
name=topic.reserve.limit_bytes: 0
28+
name=topic.reserve.used_bytes: 0
2729
name=topic.storage_bytes: 747
2830

2931
consumer=client:

ydb/core/protos/counters_pq.proto

+3
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,7 @@ enum EPartitionLabeledCounters {
228228
METRIC_MIN_SID_LIFETIME = 33 [(LabeledCounterOpts) = {Name: "SourceIdMinLifetimeMs" AggrFunc : EAF_MIN SVName: ""}];
229229

230230
METRIC_PARTITIONS_TOTAL = 34 [(LabeledCounterOpts) = {Name: "PartitionsTotal" AggrFunc : EAF_MAX SVName: "topic.partition.total_count"}];
231+
232+
METRIC_RESERVE_LIMIT_BYTES = 35 [(LabeledCounterOpts) = {Name: "ReserveLimitBytes" AggrFunc : EAF_SUM SVName: "topic.reserve.limit_bytes"}];
233+
METRIC_RESERVE_USED_BYTES = 36 [(LabeledCounterOpts) = {Name: "ReserveUsedBytes" AggrFunc : EAF_SUM SVName: "topic.reserve.used_bytes"}];
231234
}

ydb/core/protos/counters_schemeshard.proto

+1
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ enum ESimpleCounters {
185185
COUNTER_IN_FLIGHT_OPS_TxAlterExternalDataSource = 150 [(CounterOpts) = {Name: "InFlightOps/AlterExternalDataSource"}];
186186

187187
COUNTER_PQ_STATS_QUEUE_SIZE = 151 [(CounterOpts) = {Name: "PQStatsQueueSize"}];
188+
COUNTER_DISK_SPACE_TOPICS_TOTAL_BYTES = 152 [(CounterOpts) = {Name: "DiskSpaceTopicsTotalBytes"}];
188189
}
189190

190191
enum ECumulativeCounters {

ydb/core/tablet/tablet_counters_aggregator.cpp

+12-2
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,8 @@ class TTabletMon {
760760
TCounterPtr DatashardSizeBytes;
761761
TCounterPtr ResourcesStorageUsedBytes;
762762
TCounterPtr ResourcesStorageLimitBytes;
763+
TCounterPtr ResourcesStorageTableUsedBytes;
764+
TCounterPtr ResourcesStorageTopicUsedBytes;
763765
TCounterPtr ResourcesStreamUsedShards;
764766
TCounterPtr ResourcesStreamLimitShards;
765767
//TCounterPtr ResourcesStreamUsedShardsPercents;
@@ -786,6 +788,7 @@ class TTabletMon {
786788
THistogramPtr ConsumedCpuHistogram;
787789

788790
TCounterPtr DiskSpaceTablesTotalBytes;
791+
TCounterPtr DiskSpaceTopicsTotalBytes;
789792
TCounterPtr DiskSpaceSoftQuotaBytes;
790793

791794
TCounterPtr StreamShardsCount;
@@ -827,6 +830,10 @@ class TTabletMon {
827830
"resources.storage.used_bytes", false);
828831
ResourcesStorageLimitBytes = ydbGroup->GetNamedCounter("name",
829832
"resources.storage.limit_bytes", false);
833+
ResourcesStorageTableUsedBytes = ydbGroup->GetNamedCounter("name",
834+
"resources.storage.table.used_bytes", false);
835+
ResourcesStorageTopicUsedBytes = ydbGroup->GetNamedCounter("name",
836+
"resources.storage.topic.used_bytes", false);
830837

831838
ResourcesStreamUsedShards = ydbGroup->GetNamedCounter("name",
832839
"resources.stream.used_shards", false);
@@ -879,14 +886,14 @@ class TTabletMon {
879886
auto appGroup = schemeshardGroup->GetSubgroup("category", "app");
880887

881888
DiskSpaceTablesTotalBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceTablesTotalBytes)");
889+
DiskSpaceTopicsTotalBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceTopicsTotalBytes)");
882890
DiskSpaceSoftQuotaBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceSoftQuotaBytes)");
883891

884892
StreamShardsCount = appGroup->GetCounter("SUM(SchemeShard/StreamShardsCount)");
885893
StreamShardsQuota = appGroup->GetCounter("SUM(SchemeShard/StreamShardsQuota)");
886894
StreamReservedThroughput = appGroup->GetCounter("SUM(SchemeShard/StreamReservedThroughput)");
887895
StreamReservedStorage = appGroup->GetCounter("SUM(SchemeShard/StreamReservedStorage)");
888896
StreamReservedStorageLimit = appGroup->GetCounter("SUM(SchemeShard/StreamReservedStorageQuota)");
889-
890897
}
891898
}
892899

@@ -911,8 +918,11 @@ class TTabletMon {
911918
}
912919

913920
if (DiskSpaceTablesTotalBytes) {
914-
ResourcesStorageUsedBytes->Set(DiskSpaceTablesTotalBytes->Val());
915921
ResourcesStorageLimitBytes->Set(DiskSpaceSoftQuotaBytes->Val());
922+
ResourcesStorageTableUsedBytes->Set(DiskSpaceTablesTotalBytes->Val());
923+
ResourcesStorageTopicUsedBytes->Set(DiskSpaceTopicsTotalBytes->Val());
924+
925+
ResourcesStorageUsedBytes->Set(ResourcesStorageTableUsedBytes->Val() + ResourcesStorageTopicUsedBytes->Val());
916926

917927
auto quota = StreamShardsQuota->Val();
918928
ResourcesStreamUsedShards->Set(StreamShardsCount->Val());

ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ class TDeallocatePQ: public TSubOperationBase {
119119
domainInfo->DecPQReservedStorage(reserve.Storage);
120120
domainInfo->AggrDiskSpaceUsage({}, pqGroup->Stats);
121121

122+
context.SS->ChangeDiskSpaceTopicsTotalBytes(domainInfo->GetPQAccountStorage());
122123
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(reserve.Throughput);
123124
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(reserve.Storage);
124125

ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ class TPropose: public TSubOperationState {
196196
context.OnComplete.PublishToSchemeBoard(OperationId, subDomainId);
197197
}
198198

199+
context.SS->ChangeDiskSpaceTopicsTotalBytes(domainInfo->GetPQAccountStorage());
199200
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(reserve.Throughput);
200201
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(reserve.Storage);
201202

ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ bool TTxStoreTopicStats::PersistSingleStats(const TPathId& pathId, const TStatsQ
6565
NIceDb::TNiceDb db(txc.DB);
6666

6767
Self->PersistPersQueueGroupStats(db, pathId, newStats);
68+
Self->ChangeDiskSpaceTopicsTotalBytes(subDomainInfo->GetPQAccountStorage());
6869

6970
if (subDomainInfo->CheckDiskSpaceQuotas(Self)) {
7071
auto subDomainId = Self->ResolvePathIdForDomain(pathId);

ydb/core/tx/schemeshard/schemeshard_impl.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -6690,6 +6690,10 @@ void TSchemeShard::ChangeDiskSpaceTablesTotalBytes(i64 delta) {
66906690
TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_TOTAL_BYTES].Add(delta);
66916691
}
66926692

6693+
void TSchemeShard::ChangeDiskSpaceTopicsTotalBytes(ui64 value) {
6694+
TabletCounters->Simple()[COUNTER_DISK_SPACE_TOPICS_TOTAL_BYTES].Set(value);
6695+
}
6696+
66936697
void TSchemeShard::ChangeDiskSpaceQuotaExceeded(i64 delta) {
66946698
TabletCounters->Simple()[COUNTER_DISK_SPACE_QUOTA_EXCEEDED].Add(delta);
66956699
}

ydb/core/tx/schemeshard/schemeshard_impl.h

+1
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,7 @@ class TSchemeShard
12481248
void ChangeDiskSpaceTablesDataBytes(i64 delta) override;
12491249
void ChangeDiskSpaceTablesIndexBytes(i64 delta) override;
12501250
void ChangeDiskSpaceTablesTotalBytes(i64 delta) override;
1251+
void ChangeDiskSpaceTopicsTotalBytes(ui64 value) override;
12511252
void ChangeDiskSpaceQuotaExceeded(i64 delta) override;
12521253
void ChangeDiskSpaceHardQuotaBytes(i64 delta) override;
12531254
void ChangeDiskSpaceSoftQuotaBytes(i64 delta) override;

ydb/core/tx/schemeshard/schemeshard_info_types.h

+1
Original file line numberDiff line numberDiff line change
@@ -1410,6 +1410,7 @@ struct IQuotaCounters {
14101410
virtual void ChangeDiskSpaceTablesDataBytes(i64 delta) = 0;
14111411
virtual void ChangeDiskSpaceTablesIndexBytes(i64 delta) = 0;
14121412
virtual void ChangeDiskSpaceTablesTotalBytes(i64 delta) = 0;
1413+
virtual void ChangeDiskSpaceTopicsTotalBytes(ui64 value) = 0;
14131414
virtual void ChangeDiskSpaceQuotaExceeded(i64 delta) = 0;
14141415
virtual void ChangeDiskSpaceHardQuotaBytes(i64 delta) = 0;
14151416
virtual void ChangeDiskSpaceSoftQuotaBytes(i64 delta) = 0;

0 commit comments

Comments
 (0)