Skip to content

Commit 6806da4

Browse files
authored
Merge a674079 into 9029d02
2 parents 9029d02 + a674079 commit 6806da4

File tree

6 files changed

+258
-66
lines changed

6 files changed

+258
-66
lines changed

ydb/core/grpc_services/rpc_keyvalue.cpp

Lines changed: 93 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -432,58 +432,6 @@ class TDropVolumeRequest : public TRpcSchemeRequestActor<TDropVolumeRequest, TEv
432432
}
433433
};
434434

435-
class TAlterVolumeRequest : public TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest> {
436-
public:
437-
using TBase = TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>;
438-
using TBase::TBase;
439-
440-
void Bootstrap(const TActorContext& ctx) {
441-
TBase::Bootstrap(ctx);
442-
Become(&TAlterVolumeRequest::StateFunc);
443-
SendProposeRequest(ctx);
444-
}
445-
446-
void SendProposeRequest(const TActorContext &ctx) {
447-
const auto req = this->GetProtoRequest();
448-
449-
std::pair<TString, TString> pathPair;
450-
try {
451-
pathPair = SplitPath(req->path());
452-
} catch (const std::exception& ex) {
453-
Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
454-
return Reply(StatusIds::BAD_REQUEST, ctx);
455-
}
456-
const auto& workingDir = pathPair.first;
457-
const auto& name = pathPair.second;
458-
459-
std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction();
460-
NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
461-
NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
462-
modifyScheme->SetWorkingDir(workingDir);
463-
NKikimrSchemeOp::TAlterSolomonVolume* tableDesc = nullptr;
464-
465-
modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume);
466-
tableDesc = modifyScheme->MutableAlterSolomonVolume();
467-
tableDesc->SetName(name);
468-
tableDesc->SetPartitionCount(req->alter_partition_count());
469-
470-
if (GetProtoRequest()->has_storage_config()) {
471-
tableDesc->SetUpdateChannelsBinding(true);
472-
auto &storageConfig = GetProtoRequest()->storage_config();
473-
auto *internalStorageConfig = tableDesc->MutableStorageConfig();
474-
AssignPoolKinds(storageConfig, internalStorageConfig);
475-
} else {
476-
tableDesc->SetUpdateChannelsBinding(false);
477-
tableDesc->SetChannelProfileId(0);
478-
}
479-
480-
ctx.Send(MakeTxProxyID(), proposeRequest.release());
481-
}
482-
483-
STFUNC(StateFunc) {
484-
return TBase::StateWork(ev);
485-
}
486-
};
487435

488436
template <typename TDerived>
489437
class TBaseKeyValueRequest {
@@ -621,12 +569,10 @@ class TDescribeVolumeRequest
621569
Ydb::KeyValue::DescribeVolumeResult result;
622570
result.set_path(this->GetProtoRequest()->path());
623571
result.set_partition_count(desc.PartitionsSize());
624-
if (desc.PartitionsSize() > 0) {
625-
auto *storageConfig = result.mutable_storage_config();
626-
for (auto &channel : desc.GetPartitions(0).GetBoundChannels()) {
627-
auto *channelBind = storageConfig->add_channel();
628-
channelBind->set_media(channel.GetStoragePoolName());
629-
}
572+
auto *storageConfig = result.mutable_storage_config();
573+
for (auto &channel : desc.GetBoundChannels()) {
574+
auto *channelBind = storageConfig->add_channel();
575+
channelBind->set_media(channel.GetStoragePoolName());
630576
}
631577
this->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, TActivationContext::AsActorContext());
632578
}
@@ -640,6 +586,95 @@ class TDescribeVolumeRequest
640586
};
641587

642588

589+
class TAlterVolumeRequest
590+
: public TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>
591+
, public TBaseKeyValueRequest<TAlterVolumeRequest>
592+
{
593+
public:
594+
using TBase = TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>;
595+
using TBase::TBase;
596+
using TBaseKeyValueRequest::TBaseKeyValueRequest;
597+
friend class TBaseKeyValueRequest<TAlterVolumeRequest>;
598+
599+
void Bootstrap(const TActorContext& ctx) {
600+
TBase::Bootstrap(ctx);
601+
Become(&TAlterVolumeRequest::StateWork);
602+
if (GetProtoRequest()->has_storage_config()) {
603+
StorageConfig = GetProtoRequest()->storage_config();
604+
SendProposeRequest(ctx);
605+
} else {
606+
OnBootstrap();
607+
}
608+
}
609+
610+
bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) {
611+
return true;
612+
}
613+
614+
void SendProposeRequest(const TActorContext &ctx) {
615+
const auto req = this->GetProtoRequest();
616+
617+
std::pair<TString, TString> pathPair;
618+
try {
619+
pathPair = SplitPath(req->path());
620+
} catch (const std::exception& ex) {
621+
Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
622+
return Reply(StatusIds::BAD_REQUEST, ctx);
623+
}
624+
const auto& workingDir = pathPair.first;
625+
const auto& name = pathPair.second;
626+
627+
std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction();
628+
NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
629+
NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
630+
modifyScheme->SetWorkingDir(workingDir);
631+
NKikimrSchemeOp::TAlterSolomonVolume* tableDesc = nullptr;
632+
633+
modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume);
634+
tableDesc = modifyScheme->MutableAlterSolomonVolume();
635+
tableDesc->SetName(name);
636+
tableDesc->SetPartitionCount(req->alter_partition_count());
637+
638+
if (GetProtoRequest()->has_storage_config()) {
639+
tableDesc->SetUpdateChannelsBinding(true);
640+
}
641+
auto *internalStorageConfig = tableDesc->MutableStorageConfig();
642+
AssignPoolKinds(StorageConfig, internalStorageConfig);
643+
644+
ctx.Send(MakeTxProxyID(), proposeRequest.release());
645+
}
646+
647+
STFUNC(StateWork) {
648+
Cerr << "TAlterVolumeRequest::StateWork; received event: " << ev->GetTypeName() << Endl;
649+
switch (ev->GetTypeRewrite()) {
650+
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
651+
default:
652+
return TBase::StateWork(ev);
653+
}
654+
}
655+
656+
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) {
657+
TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get();
658+
NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get();
659+
660+
if (!OnNavigateKeySetResult(ev, NACLib::DescribeSchema)) {
661+
return;
662+
}
663+
664+
const NKikimrSchemeOp::TSolomonVolumeDescription &desc = request->ResultSet[0].SolomonVolumeInfo->Description;
665+
for (auto &channel : desc.GetBoundChannels()) {
666+
auto *channelBind = StorageConfig.add_channel();
667+
channelBind->set_media(channel.GetStoragePoolName());
668+
}
669+
SendProposeRequest(TActivationContext::AsActorContext());
670+
}
671+
672+
private:
673+
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
674+
Ydb::KeyValue::StorageConfig StorageConfig;
675+
};
676+
677+
643678
class TListLocalPartitionsRequest
644679
: public TRpcOperationRequestActor<TListLocalPartitionsRequest, TEvListLocalPartitionsKeyValueRequest>
645680
, public TBaseKeyValueRequest<TListLocalPartitionsRequest>

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1362,6 +1362,7 @@ message TSolomonVolumeDescription {
13621362
optional uint64 PathId = 2;
13631363
optional uint64 PartitionCount = 3;
13641364
repeated TPartition Partitions = 4;
1365+
repeated NKikimrStoragePool.TChannelBind BoundChannels = 5;
13651366
}
13661367

13671368
message TCreateSolomonVolume {

ydb/core/tx/schemeshard/schemeshard__operation_alter_solomon.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -254,17 +254,16 @@ class TAlterSolomon: public TSubOperation {
254254
return result;
255255
}
256256

257-
if (!alter.HasChannelProfileId()) {
258-
result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, "set channel profile id, please");
259-
return result;
260-
}
261-
262257
TChannelsBindings channelsBinding;
263258
bool isResolved = false;
264259
if (alter.HasStorageConfig()) {
265260
isResolved = context.SS->ResolveSolomonChannels(alter.GetStorageConfig(), path.GetPathIdForDomain(), channelsBinding);
266261
} else {
267-
isResolved = context.SS->ResolveSolomonChannels(channelProfileId, path.GetPathIdForDomain(), channelsBinding);
262+
if (!alter.HasChannelProfileId()) {
263+
result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, "set channel profile id, please");
264+
return result;
265+
}
266+
isResolved = context.SS->ResolveSolomonChannels(alter.GetChannelProfileId(), path.GetPathIdForDomain(), channelsBinding);
268267
}
269268
if (!isResolved) {
270269
result->SetError(NKikimrScheme::StatusInvalidParameter, "Unable to construct channel binding with the storage pool");

ydb/core/tx/schemeshard/schemeshard_path_describer.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,15 @@ void TPathDescriber::DescribeSolomonVolume(TPathId pathId, TPathElement::TPtr pa
810810
}
811811
}
812812

813+
if (solomonVolumeInfo->Partitions.size() > 0) {
814+
auto shardId = solomonVolumeInfo->Partitions.begin()->first;
815+
auto shardInfo = Self->ShardInfos.FindPtr(shardId);
816+
Y_ABORT_UNLESS(shardInfo);
817+
for (const auto& channel : shardInfo->BindedChannels) {
818+
entry->AddBoundChannels()->CopyFrom(channel);
819+
}
820+
}
821+
813822
Sort(entry->MutablePartitions()->begin(),
814823
entry->MutablePartitions()->end(),
815824
[] (auto& part1, auto& part2) {

ydb/core/tx/schemeshard/ut_base/ut_base.cpp

Lines changed: 148 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9216,7 +9216,16 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
92169216
"PartitionCount: 40 ");
92179217
env.TestWaitNotification(runtime, txId);
92189218
TestDescribeResult(DescribePath(runtime, "/MyRoot/Solomon"),
9219-
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(40)});
9219+
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(40),
9220+
[](const NKikimrScheme::TEvDescribeSchemeResult& result){
9221+
const auto& desc = result.GetPathDescription().GetSolomonDescription();
9222+
const auto& boundChannels = desc.GetBoundChannels();
9223+
UNIT_ASSERT_VALUES_EQUAL(boundChannels.size(), 4);
9224+
UNIT_ASSERT_VALUES_EQUAL(boundChannels[0].GetStoragePoolName(), "pool-1");
9225+
UNIT_ASSERT_VALUES_EQUAL(boundChannels[1].GetStoragePoolName(), "pool-1");
9226+
UNIT_ASSERT_VALUES_EQUAL(boundChannels[2].GetStoragePoolName(), "pool-1");
9227+
UNIT_ASSERT_VALUES_EQUAL(boundChannels[3].GetStoragePoolName(), "pool-1");
9228+
}});
92209229

92219230
// Already exists
92229231
TestCreateSolomon(runtime, ++txId, "/MyRoot", "Name: \"Solomon\" "
@@ -9317,7 +9326,16 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
93179326
env.TestWaitNotification(runtime, txId);
93189327

93199328
TestDescribeResult(DescribePath(runtime, "/MyRoot/Solomon"),
9320-
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(4)});
9329+
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(4),
9330+
[](const NKikimrScheme::TEvDescribeSchemeResult& result){
9331+
const auto& desc = result.GetPathDescription().GetSolomonDescription();
9332+
const auto& boundChannels = desc.GetBoundChannels();
9333+
UNIT_ASSERT_VALUES_EQUAL(boundChannels.size(), 4);
9334+
UNIT_ASSERT_VALUES_EQUAL(boundChannels[0].GetStoragePoolName(), "pool-1");
9335+
UNIT_ASSERT_VALUES_EQUAL(boundChannels[1].GetStoragePoolName(), "pool-1");
9336+
UNIT_ASSERT_VALUES_EQUAL(boundChannels[2].GetStoragePoolName(), "pool-1");
9337+
UNIT_ASSERT_VALUES_EQUAL(boundChannels[3].GetStoragePoolName(), "pool-1");
9338+
}});
93219339

93229340
TestDropSolomon(runtime, ++txId, "/MyRoot", "Solomon");
93239341
env.TestWaitNotification(runtime, txId);
@@ -9430,6 +9448,134 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
94309448
UpdateChannelsBindingSolomon(true);
94319449
}
94329450

9451+
void UpdateChannelsBindingSolomonStorageConfig() {
9452+
TTestBasicRuntime runtime;
9453+
TTestEnv env(runtime, TTestEnvOptions().AllowUpdateChannelsBindingOfSolomonPartitions(true));
9454+
ui64 txId = 100;
9455+
9456+
auto check = [&](const TString& path, ui64 shards, const THashMap<TString, ui32>& expectedChannels) {
9457+
NKikimrSchemeOp::TDescribeOptions opts;
9458+
opts.SetReturnChannelsBinding(true);
9459+
9460+
auto makeChannels = [](const auto& boundsChannels) {
9461+
THashMap<TString, ui32> channels;
9462+
for (const auto& channel : boundsChannels) {
9463+
channels[channel.GetStoragePoolName()]++;
9464+
}
9465+
return channels;
9466+
};
9467+
9468+
TestDescribeResult(DescribePath(runtime, path, opts), {
9469+
NLs::Finished,
9470+
NLs::ShardsInsideDomain(shards),
9471+
[&expectedChannels, &makeChannels, &shards] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
9472+
const auto& desc = record.GetPathDescription().GetSolomonDescription();
9473+
9474+
UNIT_ASSERT_VALUES_EQUAL(shards, desc.PartitionsSize());
9475+
9476+
for (size_t i = 0; i < desc.PartitionsSize(); ++i) {
9477+
const auto& partition = desc.GetPartitions(i);
9478+
9479+
THashMap<TString, ui32> channels = makeChannels(partition.GetBoundChannels());
9480+
UNIT_ASSERT_VALUES_EQUAL(expectedChannels.size(), channels.size());
9481+
9482+
for (const auto& [name, count] : expectedChannels) {
9483+
UNIT_ASSERT_C(channels.contains(name), "Cannot find channel: " << name);
9484+
UNIT_ASSERT_VALUES_EQUAL(channels.at(name), count);
9485+
}
9486+
}
9487+
9488+
THashMap<TString, ui32> volumeChannels = makeChannels(desc.GetBoundChannels());
9489+
UNIT_ASSERT_VALUES_EQUAL(expectedChannels.size(), volumeChannels.size());
9490+
9491+
for (const auto& [name, count] : expectedChannels) {
9492+
UNIT_ASSERT_C(volumeChannels.contains(name), "Cannot find channel: " << name);
9493+
UNIT_ASSERT_VALUES_EQUAL(volumeChannels.at(name), count);
9494+
}
9495+
}
9496+
});
9497+
};
9498+
9499+
TestCreateSolomon(runtime, ++txId, "/MyRoot", R"(
9500+
Name: "Solomon"
9501+
PartitionCount: 1
9502+
StorageConfig {
9503+
Channel {
9504+
PreferredPoolKind: "pool-kind-1"
9505+
}
9506+
Channel {
9507+
PreferredPoolKind: "pool-kind-1"
9508+
}
9509+
Channel {
9510+
PreferredPoolKind: "pool-kind-1"
9511+
}
9512+
}
9513+
)");
9514+
9515+
env.TestWaitNotification(runtime, txId);
9516+
check("/MyRoot/Solomon", 1, {{{"pool-1", 3}}});
9517+
// case 1: empty alter
9518+
TestAlterSolomon(runtime, ++txId, "/MyRoot", R"(
9519+
Name: "Solomon"
9520+
StorageConfig {
9521+
Channel {
9522+
PreferredPoolKind: "pool-kind-2"
9523+
}
9524+
Channel {
9525+
PreferredPoolKind: "pool-kind-2"
9526+
}
9527+
Channel {
9528+
PreferredPoolKind: "pool-kind-2"
9529+
}
9530+
}
9531+
)", {NKikimrScheme::StatusInvalidParameter});
9532+
9533+
// case 2: add partition
9534+
TestAlterSolomon(runtime, ++txId, "/MyRoot", R"(
9535+
Name: "Solomon"
9536+
PartitionCount: 2
9537+
StorageConfig {
9538+
Channel {
9539+
PreferredPoolKind: "pool-kind-1"
9540+
}
9541+
Channel {
9542+
PreferredPoolKind: "pool-kind-1"
9543+
}
9544+
Channel {
9545+
PreferredPoolKind: "pool-kind-1"
9546+
}
9547+
}
9548+
)");
9549+
9550+
env.TestWaitNotification(runtime, txId);
9551+
check("/MyRoot/Solomon", 2, {{"pool-1", 3}});
9552+
9553+
// case 3: add partition & update channels binding
9554+
TestAlterSolomon(runtime, ++txId, "/MyRoot", R"(
9555+
Name: "Solomon"
9556+
PartitionCount: 3
9557+
UpdateChannelsBinding: true
9558+
StorageConfig {
9559+
Channel {
9560+
PreferredPoolKind: "pool-kind-2"
9561+
}
9562+
Channel {
9563+
PreferredPoolKind: "pool-kind-2"
9564+
}
9565+
Channel {
9566+
PreferredPoolKind: "pool-kind-2"
9567+
}
9568+
}
9569+
)");
9570+
9571+
env.TestWaitNotification(runtime, txId);
9572+
check("/MyRoot/Solomon", 3, {{"pool-2", 3}});
9573+
}
9574+
9575+
Y_UNIT_TEST(UpdateChannelsBindingSolomonStorageConfig) {
9576+
UpdateChannelsBindingSolomonStorageConfig();
9577+
}
9578+
94339579
Y_UNIT_TEST(RejectAlterSolomon) { //+
94349580
TTestBasicRuntime runtime;
94359581
TTestEnv env(runtime);

0 commit comments

Comments
 (0)