Skip to content

Commit d36c6ff

Browse files
authored
Autoscaling logs (#4976)
1 parent c57e575 commit d36c6ff

File tree

7 files changed

+41
-15
lines changed

7 files changed

+41
-15
lines changed

ydb/core/persqueue/partition_scale_manager.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ TPartitionScaleManager::TPartitionScaleManager(
1717

1818
void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) {
1919
if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) {
20+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange "
21+
<< "need to split partition " << partition.Id);
2022
PartitionsToSplit.emplace(partition.Id, partition);
2123
TrySendScaleRequest(ctx);
2224
} else {
@@ -30,12 +32,14 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
3032
return;
3133
}
3234

33-
auto splitMergePair = BuildScaleRequest();
35+
auto splitMergePair = BuildScaleRequest(ctx);
3436
if (splitMergePair.first.empty() && splitMergePair.second.empty()) {
3537
return;
3638
}
3739

3840
RequestInflight = true;
41+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange "
42+
<< "send split request");
3943
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
4044
TopicName,
4145
DatabasePath,
@@ -51,7 +55,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
5155
using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
5256
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;
5357

54-
std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartitionScaleManager::BuildScaleRequest() {
58+
std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartitionScaleManager::BuildScaleRequest(const TActorContext& ctx) {
5559
std::vector<TPartitionSplit> splitsToApply;
5660
std::vector<TPartitionMerge> mergesToApply;
5761

@@ -62,11 +66,15 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
6266
const auto& partition = itSplit->second;
6367

6468
if (BalancerConfig.PartitionGraph.GetPartition(partitionId)->Children.empty()) {
65-
auto mid = GetRangeMid(partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "", partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : "");
69+
auto from = partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "";
70+
auto to = partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : "";
71+
auto mid = GetRangeMid(from, to);
6672
if (mid.empty()) {
6773
itSplit = PartitionsToSplit.erase(itSplit);
74+
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << partitionId);
6875
continue;
6976
}
77+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest partition split ranges. From# '" << from << "'. To# '" << to << "'. Mid# '" << mid <<"'. Topic# " << TopicName << ". Partition# " << partitionId);
7078

7179
TPartitionSplit split;
7280
split.set_partition(partition.Id);
@@ -87,6 +95,7 @@ void TPartitionScaleManager::HandleScaleRequestResult(TPartitionScaleRequest::TE
8795
RequestInflight = false;
8896
LastResponseTime = ctx.Now();
8997
auto result = ev->Get();
98+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleRequestResult scale request result: " << result->Status << ". Topic# " << TopicName);
9099
if (result->Status == TEvTxUserProxy::TResultStatus::ExecComplete) {
91100
TrySendScaleRequest(ctx);
92101
} else {

ydb/core/persqueue/partition_scale_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class TPartitionScaleManager {
6666
using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit;
6767
using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge;
6868

69-
std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> BuildScaleRequest();
69+
std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> BuildScaleRequest(const TActorContext& ctx);
7070

7171
public:
7272
static const ui64 TRY_SCALE_REQUEST_WAKE_UP_TAG = 10;

ydb/core/persqueue/partition_scale_request.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
3030
void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) {
3131
auto proposal = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
3232
proposal->Record.SetDatabaseName(CanonizePath(DatabasePath));
33-
FillProposeRequest(*proposal, DatabasePath, Topic);
33+
FillProposeRequest(*proposal, DatabasePath, Topic, ctx);
3434
ctx.Send(MakeTxProxyID(), proposal.release());
3535
}
3636

37-
void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName) {
37+
void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx) {
3838
auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme();
3939
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup);
4040
modifyScheme.SetWorkingDir(workingDir);
@@ -46,11 +46,14 @@ void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransa
4646

4747
NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
4848
groupDescription.SetName(topicName);
49-
49+
TStringBuilder logMessage;
50+
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: ";
5051
for(const auto& split: Splits) {
5152
auto* newSplit = groupDescription.AddSplit();
53+
logMessage << "partition: " << split.GetPartition() << " boundary: '" << split.GetSplitBoundary() << "' ";
5254
*newSplit = split;
5355
}
56+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, logMessage);
5457

5558
for(const auto& merge: Merges) {
5659
auto* newMerge = groupDescription.AddMerge();
@@ -98,7 +101,8 @@ void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus:
98101
for (auto& issue : ev->Get()->Record.GetIssues()) {
99102
issues << issue.ShortDebugString() + ", ";
100103
}
101-
Cerr << "\n SAVDGB " << issues << "\n";
104+
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleRequest "
105+
<< "SchemaShard error when trying to execute a split request: " << issues);
102106
Send(ParentActorId, scaleRequestResult.release());
103107
Die(ctx);
104108
} else {

ydb/core/persqueue/partition_scale_request.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace NKikimr {
1414
namespace NPQ {
15-
15+
1616
class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScaleRequest> {
1717
using TBase = NActors::TActorBootstrapped<TPartitionScaleRequest>;
1818

@@ -48,7 +48,7 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
4848
}
4949
std::pair<TString, TString> SplitPath(const TString& path);
5050
void SendProposeRequest(const NActors::TActorContext &ctx);
51-
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName);
51+
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx);
5252

5353
private:
5454
const TString Topic;

ydb/core/persqueue/partition_write.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -541,17 +541,31 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
541541
ProcessTimestampsForNewData(prevEndOffset, ctx);
542542
}
543543

544-
NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& /*ctx*/) {
544+
NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
545545
auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;
546-
546+
LOG_DEBUG_S(
547+
ctx, NKikimrServices::PERSQUEUE,
548+
"TPartition::CheckScaleStatus writeSpeedUsagePercent# " << writeSpeedUsagePercent << " Topic: \"" << TopicName() << "\"." <<
549+
" Partition: " << Partition
550+
);
547551
auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
548552
|| Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;
549-
553+
550554
auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;
551555

552556
if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
557+
LOG_DEBUG_S(
558+
ctx, NKikimrServices::PERSQUEUE,
559+
"TPartition::CheckScaleStatus NEED_SPLIT" << " Topic: \"" << TopicName() << "\"." <<
560+
" Partition: " << Partition
561+
);
553562
return NKikimrPQ::EScaleStatus::NEED_SPLIT;
554563
} else if (mergeEnabled && writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
564+
LOG_DEBUG_S(
565+
ctx, NKikimrServices::PERSQUEUE,
566+
"TPartition::CheckScaleStatus NEED_MERGE" << " Topic: \"" << TopicName() << "\"." <<
567+
" Partition: " << Partition
568+
);
555569
return NKikimrPQ::EScaleStatus::NEED_MERGE;
556570
}
557571
return NKikimrPQ::EScaleStatus::NORMAL;

ydb/core/persqueue/read_balancer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
511511
Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);
512512

513513
partitionsInfo[p.GetPartition()] = {p.GetTabletId(), {}};
514-
if (SplitMergeEnabled(TabletConfig)) {
514+
if (SplitMergeEnabled(TabletConfig) && p.HasKeyRange()) {
515515
partitionsInfo[p.GetPartition()].KeyRange.DeserializeFromProto(p.GetKeyRange());
516516
}
517517

ydb/core/persqueue/ut/autoscaling_ut.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
624624
a = "a";
625625
b = {};
626626
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
627-
Cerr << "\n SAVDBG " << res << "\n";
628627
UNIT_ASSERT(a < res);
629628
UNIT_ASSERT(b != res);
630629

0 commit comments

Comments
 (0)