Skip to content

Fix describe_volume and alter_volume #14978

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 93 additions & 58 deletions ydb/core/grpc_services/rpc_keyvalue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,58 +432,6 @@ class TDropVolumeRequest : public TRpcSchemeRequestActor<TDropVolumeRequest, TEv
}
};

class TAlterVolumeRequest : public TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest> {
public:
using TBase = TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>;
using TBase::TBase;

void Bootstrap(const TActorContext& ctx) {
TBase::Bootstrap(ctx);
Become(&TAlterVolumeRequest::StateFunc);
SendProposeRequest(ctx);
}

void SendProposeRequest(const TActorContext &ctx) {
const auto req = this->GetProtoRequest();

std::pair<TString, TString> pathPair;
try {
pathPair = SplitPath(req->path());
} catch (const std::exception& ex) {
Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
return Reply(StatusIds::BAD_REQUEST, ctx);
}
const auto& workingDir = pathPair.first;
const auto& name = pathPair.second;

std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction();
NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
modifyScheme->SetWorkingDir(workingDir);
NKikimrSchemeOp::TAlterSolomonVolume* tableDesc = nullptr;

modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume);
tableDesc = modifyScheme->MutableAlterSolomonVolume();
tableDesc->SetName(name);
tableDesc->SetPartitionCount(req->alter_partition_count());

if (GetProtoRequest()->has_storage_config()) {
tableDesc->SetUpdateChannelsBinding(true);
auto &storageConfig = GetProtoRequest()->storage_config();
auto *internalStorageConfig = tableDesc->MutableStorageConfig();
AssignPoolKinds(storageConfig, internalStorageConfig);
} else {
tableDesc->SetUpdateChannelsBinding(false);
tableDesc->SetChannelProfileId(0);
}

ctx.Send(MakeTxProxyID(), proposeRequest.release());
}

STFUNC(StateFunc) {
return TBase::StateWork(ev);
}
};

template <typename TDerived>
class TBaseKeyValueRequest {
Expand Down Expand Up @@ -621,12 +569,10 @@ class TDescribeVolumeRequest
Ydb::KeyValue::DescribeVolumeResult result;
result.set_path(this->GetProtoRequest()->path());
result.set_partition_count(desc.PartitionsSize());
if (desc.PartitionsSize() > 0) {
auto *storageConfig = result.mutable_storage_config();
for (auto &channel : desc.GetPartitions(0).GetBoundChannels()) {
auto *channelBind = storageConfig->add_channel();
channelBind->set_media(channel.GetStoragePoolName());
}
auto *storageConfig = result.mutable_storage_config();
for (auto &channel : desc.GetBoundChannels()) {
auto *channelBind = storageConfig->add_channel();
channelBind->set_media(channel.GetStoragePoolName());
}
this->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, TActivationContext::AsActorContext());
}
Expand All @@ -640,6 +586,95 @@ class TDescribeVolumeRequest
};


class TAlterVolumeRequest
: public TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>
, public TBaseKeyValueRequest<TAlterVolumeRequest>
{
public:
using TBase = TRpcSchemeRequestActor<TAlterVolumeRequest, TEvAlterVolumeKeyValueRequest>;
using TBase::TBase;
using TBaseKeyValueRequest::TBaseKeyValueRequest;
friend class TBaseKeyValueRequest<TAlterVolumeRequest>;

void Bootstrap(const TActorContext& ctx) {
TBase::Bootstrap(ctx);
Become(&TAlterVolumeRequest::StateWork);
if (GetProtoRequest()->has_storage_config()) {
StorageConfig = GetProtoRequest()->storage_config();
SendProposeRequest(ctx);
} else {
OnBootstrap();
}
}

bool ValidateRequest(Ydb::StatusIds::StatusCode& /*status*/, NYql::TIssues& /*issues*/) {
return true;
}

void SendProposeRequest(const TActorContext &ctx) {
const auto req = this->GetProtoRequest();

std::pair<TString, TString> pathPair;
try {
pathPair = SplitPath(req->path());
} catch (const std::exception& ex) {
Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
return Reply(StatusIds::BAD_REQUEST, ctx);
}
const auto& workingDir = pathPair.first;
const auto& name = pathPair.second;

std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = this->CreateProposeTransaction();
NKikimrTxUserProxy::TEvProposeTransaction& record = proposeRequest->Record;
NKikimrSchemeOp::TModifyScheme* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
modifyScheme->SetWorkingDir(workingDir);
NKikimrSchemeOp::TAlterSolomonVolume* tableDesc = nullptr;

modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume);
tableDesc = modifyScheme->MutableAlterSolomonVolume();
tableDesc->SetName(name);
tableDesc->SetPartitionCount(req->alter_partition_count());

if (GetProtoRequest()->has_storage_config()) {
tableDesc->SetUpdateChannelsBinding(true);
}
auto *internalStorageConfig = tableDesc->MutableStorageConfig();
AssignPoolKinds(StorageConfig, internalStorageConfig);

ctx.Send(MakeTxProxyID(), proposeRequest.release());
}

STFUNC(StateWork) {
Cerr << "TAlterVolumeRequest::StateWork; received event: " << ev->GetTypeName() << Endl;
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
default:
return TBase::StateWork(ev);
}
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) {
TEvTxProxySchemeCache::TEvNavigateKeySetResult* res = ev->Get();
NSchemeCache::TSchemeCacheNavigate *request = res->Request.Get();

if (!OnNavigateKeySetResult(ev, NACLib::DescribeSchema)) {
return;
}

const NKikimrSchemeOp::TSolomonVolumeDescription &desc = request->ResultSet[0].SolomonVolumeInfo->Description;
for (auto &channel : desc.GetBoundChannels()) {
auto *channelBind = StorageConfig.add_channel();
channelBind->set_media(channel.GetStoragePoolName());
}
SendProposeRequest(TActivationContext::AsActorContext());
}

private:
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
Ydb::KeyValue::StorageConfig StorageConfig;
};


class TListLocalPartitionsRequest
: public TRpcOperationRequestActor<TListLocalPartitionsRequest, TEvListLocalPartitionsKeyValueRequest>
, public TBaseKeyValueRequest<TListLocalPartitionsRequest>
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,7 @@ message TSolomonVolumeDescription {
optional uint64 PathId = 2;
optional uint64 PartitionCount = 3;
repeated TPartition Partitions = 4;
repeated NKikimrStoragePool.TChannelBind BoundChannels = 5;
}

message TCreateSolomonVolume {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,17 +254,16 @@ class TAlterSolomon: public TSubOperation {
return result;
}

if (!alter.HasChannelProfileId()) {
result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, "set channel profile id, please");
return result;
}

TChannelsBindings channelsBinding;
bool isResolved = false;
if (alter.HasStorageConfig()) {
isResolved = context.SS->ResolveSolomonChannels(alter.GetStorageConfig(), path.GetPathIdForDomain(), channelsBinding);
} else {
isResolved = context.SS->ResolveSolomonChannels(channelProfileId, path.GetPathIdForDomain(), channelsBinding);
if (!alter.HasChannelProfileId()) {
result->SetError(TEvSchemeShard::EStatus::StatusInvalidParameter, "set channel profile id, please");
return result;
}
Comment on lines +262 to +265
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Добавишь тест?

isResolved = context.SS->ResolveSolomonChannels(alter.GetChannelProfileId(), path.GetPathIdForDomain(), channelsBinding);
}
if (!isResolved) {
result->SetError(NKikimrScheme::StatusInvalidParameter, "Unable to construct channel binding with the storage pool");
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,15 @@ void TPathDescriber::DescribeSolomonVolume(TPathId pathId, TPathElement::TPtr pa
}
}

if (solomonVolumeInfo->Partitions.size() > 0) {
auto shardId = solomonVolumeInfo->Partitions.begin()->first;
auto shardInfo = Self->ShardInfos.FindPtr(shardId);
Y_ABORT_UNLESS(shardInfo);
for (const auto& channel : shardInfo->BindedChannels) {
entry->AddBoundChannels()->CopyFrom(channel);
}
}

Sort(entry->MutablePartitions()->begin(),
entry->MutablePartitions()->end(),
[] (auto& part1, auto& part2) {
Expand Down
150 changes: 148 additions & 2 deletions ydb/core/tx/schemeshard/ut_base/ut_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9216,7 +9216,16 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
"PartitionCount: 40 ");
env.TestWaitNotification(runtime, txId);
TestDescribeResult(DescribePath(runtime, "/MyRoot/Solomon"),
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(40)});
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(40),
[](const NKikimrScheme::TEvDescribeSchemeResult& result){
const auto& desc = result.GetPathDescription().GetSolomonDescription();
const auto& boundChannels = desc.GetBoundChannels();
UNIT_ASSERT_VALUES_EQUAL(boundChannels.size(), 4);
UNIT_ASSERT_VALUES_EQUAL(boundChannels[0].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[1].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[2].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[3].GetStoragePoolName(), "pool-1");
}});

// Already exists
TestCreateSolomon(runtime, ++txId, "/MyRoot", "Name: \"Solomon\" "
Expand Down Expand Up @@ -9317,7 +9326,16 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePath(runtime, "/MyRoot/Solomon"),
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(4)});
{NLs::Finished, NLs::PathsInsideDomain(1), NLs::ShardsInsideDomain(4),
[](const NKikimrScheme::TEvDescribeSchemeResult& result){
const auto& desc = result.GetPathDescription().GetSolomonDescription();
const auto& boundChannels = desc.GetBoundChannels();
UNIT_ASSERT_VALUES_EQUAL(boundChannels.size(), 4);
UNIT_ASSERT_VALUES_EQUAL(boundChannels[0].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[1].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[2].GetStoragePoolName(), "pool-1");
UNIT_ASSERT_VALUES_EQUAL(boundChannels[3].GetStoragePoolName(), "pool-1");
}});

TestDropSolomon(runtime, ++txId, "/MyRoot", "Solomon");
env.TestWaitNotification(runtime, txId);
Expand Down Expand Up @@ -9430,6 +9448,134 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
UpdateChannelsBindingSolomon(true);
}

void UpdateChannelsBindingSolomonStorageConfig() {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().AllowUpdateChannelsBindingOfSolomonPartitions(true));
ui64 txId = 100;

auto check = [&](const TString& path, ui64 shards, const THashMap<TString, ui32>& expectedChannels) {
NKikimrSchemeOp::TDescribeOptions opts;
opts.SetReturnChannelsBinding(true);

auto makeChannels = [](const auto& boundsChannels) {
THashMap<TString, ui32> channels;
for (const auto& channel : boundsChannels) {
channels[channel.GetStoragePoolName()]++;
}
return channels;
};

TestDescribeResult(DescribePath(runtime, path, opts), {
NLs::Finished,
NLs::ShardsInsideDomain(shards),
[&expectedChannels, &makeChannels, &shards] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
const auto& desc = record.GetPathDescription().GetSolomonDescription();

UNIT_ASSERT_VALUES_EQUAL(shards, desc.PartitionsSize());

for (size_t i = 0; i < desc.PartitionsSize(); ++i) {
const auto& partition = desc.GetPartitions(i);

THashMap<TString, ui32> channels = makeChannels(partition.GetBoundChannels());
UNIT_ASSERT_VALUES_EQUAL(expectedChannels.size(), channels.size());

for (const auto& [name, count] : expectedChannels) {
UNIT_ASSERT_C(channels.contains(name), "Cannot find channel: " << name);
UNIT_ASSERT_VALUES_EQUAL(channels.at(name), count);
}
}

THashMap<TString, ui32> volumeChannels = makeChannels(desc.GetBoundChannels());
UNIT_ASSERT_VALUES_EQUAL(expectedChannels.size(), volumeChannels.size());

for (const auto& [name, count] : expectedChannels) {
UNIT_ASSERT_C(volumeChannels.contains(name), "Cannot find channel: " << name);
UNIT_ASSERT_VALUES_EQUAL(volumeChannels.at(name), count);
}
}
});
};

TestCreateSolomon(runtime, ++txId, "/MyRoot", R"(
Name: "Solomon"
PartitionCount: 1
StorageConfig {
Channel {
PreferredPoolKind: "pool-kind-1"
}
Channel {
PreferredPoolKind: "pool-kind-1"
}
Channel {
PreferredPoolKind: "pool-kind-1"
}
}
)");

env.TestWaitNotification(runtime, txId);
check("/MyRoot/Solomon", 1, {{{"pool-1", 3}}});
// case 1: empty alter
TestAlterSolomon(runtime, ++txId, "/MyRoot", R"(
Name: "Solomon"
StorageConfig {
Channel {
PreferredPoolKind: "pool-kind-2"
}
Channel {
PreferredPoolKind: "pool-kind-2"
}
Channel {
PreferredPoolKind: "pool-kind-2"
}
}
)", {NKikimrScheme::StatusInvalidParameter});

// case 2: add partition
TestAlterSolomon(runtime, ++txId, "/MyRoot", R"(
Name: "Solomon"
PartitionCount: 2
StorageConfig {
Channel {
PreferredPoolKind: "pool-kind-1"
}
Channel {
PreferredPoolKind: "pool-kind-1"
}
Channel {
PreferredPoolKind: "pool-kind-1"
}
}
)");

env.TestWaitNotification(runtime, txId);
check("/MyRoot/Solomon", 2, {{"pool-1", 3}});

// case 3: add partition & update channels binding
TestAlterSolomon(runtime, ++txId, "/MyRoot", R"(
Name: "Solomon"
PartitionCount: 3
UpdateChannelsBinding: true
StorageConfig {
Channel {
PreferredPoolKind: "pool-kind-2"
}
Channel {
PreferredPoolKind: "pool-kind-2"
}
Channel {
PreferredPoolKind: "pool-kind-2"
}
}
)");

env.TestWaitNotification(runtime, txId);
check("/MyRoot/Solomon", 3, {{"pool-2", 3}});
}

Y_UNIT_TEST(UpdateChannelsBindingSolomonStorageConfig) {
UpdateChannelsBindingSolomonStorageConfig();
}

Y_UNIT_TEST(RejectAlterSolomon) { //+
TTestBasicRuntime runtime;
TTestEnv env(runtime);
Expand Down
Loading
Loading