@@ -4,15 +4,17 @@ namespace NKikimr {
4
4
namespace NPQ {
5
5
6
6
TPartitionScaleRequest::TPartitionScaleRequest (
7
- TString topicName,
8
- TString databasePath,
7
+ const TString& topicName,
8
+ const TString& topicPath,
9
+ const TString& databasePath,
9
10
ui64 pathId,
10
11
ui64 pathVersion,
11
- std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
12
- const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
13
- NActors::TActorId parentActorId
12
+ const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit>& splits,
13
+ const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge>& merges,
14
+ const NActors::TActorId& parentActorId
14
15
)
15
16
: Topic(topicName)
17
+ , TopicPath(topicPath)
16
18
, DatabasePath(databasePath)
17
19
, PathId(pathId)
18
20
, PathVersion(pathVersion)
@@ -30,24 +32,27 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
30
32
void TPartitionScaleRequest::SendProposeRequest (const NActors::TActorContext &ctx) {
31
33
auto proposal = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
32
34
proposal->Record .SetDatabaseName (CanonizePath (DatabasePath));
33
- FillProposeRequest (*proposal, DatabasePath, Topic, ctx);
35
+ FillProposeRequest (*proposal, ctx);
34
36
ctx.Send (MakeTxProxyID (), proposal.release ());
35
37
}
36
38
37
- void TPartitionScaleRequest::FillProposeRequest (TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx) {
39
+ void TPartitionScaleRequest::FillProposeRequest (TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx) {
40
+ auto workingDir = TopicPath.substr (0 , TopicPath.size () - Topic.size ());
41
+
38
42
auto & modifyScheme = *proposal.Record .MutableTransaction ()->MutableModifyScheme ();
39
43
modifyScheme.SetOperationType (NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup);
40
44
modifyScheme.SetWorkingDir (workingDir);
45
+ modifyScheme.SetInternal (true );
41
46
42
47
auto applyIf = modifyScheme.AddApplyIf ();
43
48
applyIf->SetPathId (PathId);
44
49
applyIf->SetPathVersion (PathVersion == 0 ? 1 : PathVersion);
45
50
applyIf->SetCheckEntityVersion (true );
46
51
47
52
NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
48
- groupDescription.SetName (topicName );
53
+ groupDescription.SetName (Topic );
49
54
TStringBuilder logMessage;
50
- logMessage << " TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: " ;
55
+ logMessage << " TPartitionScaleRequest::FillProposeRequest trying to scale partitions of ' " << workingDir << " / " << Topic << " ' . Spilts: " ;
51
56
for (const auto & split: Splits) {
52
57
auto * newSplit = groupDescription.AddSplit ();
53
58
logMessage << " partition: " << split.GetPartition () << " boundary: '" << split.GetSplitBoundary () << " ' " ;
0 commit comments