Skip to content

Commit 754b22f

Browse files
authored
Merge 65f7d70 into be291e7
2 parents be291e7 + 65f7d70 commit 754b22f

File tree

7 files changed

+146
-47
lines changed

7 files changed

+146
-47
lines changed

ydb/core/tx/scheme_board/cache.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ namespace {
188188
return entry.KeyDescription->SecurityObject;
189189
}
190190

191-
static ui32 GetAccess(const TNavigate::TEntry&) {
192-
return NACLib::EAccessRights::DescribeSchema;
191+
static ui32 GetAccess(const TNavigate::TEntry& entry) {
192+
return entry.Access;
193193
}
194194

195195
static ui32 GetAccess(const TResolve::TEntry& entry) {

ydb/core/tx/scheme_cache/scheme_cache.h

+1
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ struct TSchemeCacheNavigate {
258258

259259
// in
260260
TVector<TString> Path;
261+
ui32 Access = NACLib::DescribeSchema;
261262
TTableId TableId;
262263
ERequestType RequestType = ERequestType::ByPath;
263264
EOp Operation = OpUnknown;

ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp

+71-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
#include "impl/trace_lazy.h"
12
#include "ut_utils/topic_sdk_test_setup.h"
23

4+
#include <format>
35
#include <ydb/library/persqueue/topic_parser_public/topic_parser.h>
6+
#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
47
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
58
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
69
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
@@ -10,8 +13,6 @@
1013
#include <library/cpp/threading/future/future.h>
1114
#include <library/cpp/threading/future/async.h>
1215

13-
#include <future>
14-
1516
namespace NYdb::NTopic::NTests {
1617

1718
Y_UNIT_TEST_SUITE(Describe) {
@@ -344,5 +345,73 @@ namespace NYdb::NTopic::NTests {
344345
DescribeConsumer(setup, client, false, false, true, true);
345346
DescribePartition(setup, client, false, false, true, true);
346347
}
348+
349+
TDescribePartitionResult RunPermissionTest(TTopicSdkTestSetup& setup, int userId, bool existingTopic, bool allowUpdateRow, bool allowDescribeSchema) {
350+
TString authToken = TStringBuilder() << "x-user-" << userId << "@builtin";
351+
Cerr << std::format("=== existingTopic={} allowUpdateRow={} allowDescribeSchema={} authToken={}\n",
352+
existingTopic, allowUpdateRow, allowDescribeSchema, std::string(authToken));
353+
354+
auto driverConfig = setup.MakeDriverConfig().SetAuthToken(authToken);
355+
auto client = TTopicClient(TDriver(driverConfig));
356+
auto settings = TDescribePartitionSettings().IncludeLocation(true);
357+
i64 testPartitionId = 0;
358+
359+
NACLib::TDiffACL acl;
360+
if (allowDescribeSchema) {
361+
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::DescribeSchema, authToken);
362+
}
363+
if (allowUpdateRow) {
364+
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::UpdateRow, authToken);
365+
}
366+
setup.GetServer().AnnoyingClient->ModifyACL("/Root", TEST_TOPIC, acl.SerializeAsString());
367+
368+
return client.DescribePartition(existingTopic ? TEST_TOPIC : "bad-topic", testPartitionId, settings).GetValueSync();
369+
}
370+
371+
Y_UNIT_TEST(DescribePartitionPermissions) {
372+
TTopicSdkTestSetup setup(TEST_CASE_NAME);
373+
setup.GetServer().EnableLogs({NKikimrServices::TX_PROXY_SCHEME_CACHE, NKikimrServices::SCHEME_BOARD_SUBSCRIBER}, NActors::NLog::PRI_TRACE);
374+
375+
int userId = 0;
376+
377+
struct Expectation {
378+
bool existingTopic;
379+
bool allowUpdateRow;
380+
bool allowDescribeSchema;
381+
EStatus status;
382+
NYql::TIssueCode issueCode;
383+
};
384+
385+
std::vector<Expectation> expectations{
386+
{0, 0, 0, EStatus::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ACCESS_DENIED},
387+
{0, 0, 1, EStatus::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ACCESS_DENIED},
388+
{0, 1, 0, EStatus::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ACCESS_DENIED},
389+
{0, 1, 1, EStatus::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ACCESS_DENIED},
390+
{1, 0, 0, EStatus::SCHEME_ERROR, Ydb::PersQueue::ErrorCode::ACCESS_DENIED},
391+
{1, 0, 1, EStatus::SUCCESS, 0},
392+
{1, 1, 0, EStatus::SUCCESS, 0},
393+
{1, 1, 1, EStatus::SUCCESS, 0},
394+
};
395+
396+
for (auto [existing, update, describe, status, issue] : expectations) {
397+
auto result = RunPermissionTest(setup, userId++, existing, update, describe);
398+
auto resultStatus = result.GetStatus();
399+
auto line = TStringBuilder() << "=== status=" << resultStatus;
400+
NYql::TIssueCode resultIssue = 0;
401+
if (!result.GetIssues().Empty()) {
402+
resultIssue = result.GetIssues().begin()->GetCode();
403+
line << " issueCode=" << resultIssue;
404+
}
405+
Cerr << (line << " issues=" << result.GetIssues().ToOneLineString() << Endl);
406+
407+
UNIT_ASSERT_EQUAL(resultStatus, status);
408+
UNIT_ASSERT_EQUAL(resultIssue, issue);
409+
if (resultStatus == EStatus::SUCCESS) {
410+
auto& p = result.GetPartitionDescription().GetPartition();
411+
UNIT_ASSERT(p.GetActive());
412+
UNIT_ASSERT(p.GetPartitionLocation().Defined());
413+
}
414+
}
415+
}
347416
}
348417
}

ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -359,4 +359,4 @@ namespace NYdb::NTopic::NTests {
359359
writeSession->Close();
360360
}
361361
}
362-
}
362+
}

ydb/services/lib/actors/pq_schema_actor.h

+26-31
Original file line numberDiff line numberDiff line change
@@ -128,49 +128,44 @@ namespace NKikimr::NGRpcProxy::V1 {
128128
return false;
129129
};
130130

131-
132131
void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate) {
133132
auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
134-
navigateRequest->DatabaseName = CanonizePath(Database);
135-
136-
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
137-
entry.Path = NKikimr::SplitPath(GetTopicPath());
138-
entry.SyncVersion = true;
139-
entry.ShowPrivatePath = showPrivate;
140-
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
141-
navigateRequest->ResultSet.emplace_back(entry);
142-
143133
if (!SetRequestToken(navigateRequest.get())) {
144134
AddIssue(FillIssue("Unauthenticated access is forbidden, please provide credentials",
145135
Ydb::PersQueue::ErrorCode::ACCESS_DENIED));
146136
return RespondWithCode(Ydb::StatusIds::UNAUTHORIZED);
147137
}
138+
139+
navigateRequest->DatabaseName = CanonizePath(Database);
140+
navigateRequest->ResultSet.emplace_back(NSchemeCache::TSchemeCacheNavigate::TEntry{
141+
.Path = NKikimr::SplitPath(GetTopicPath()),
142+
.Access = CheckAccessWithWriteTopicPermission ? NACLib::UpdateRow : NACLib::DescribeSchema,
143+
.Operation = NSchemeCache::TSchemeCacheNavigate::OpList,
144+
.ShowPrivatePath = showPrivate,
145+
.SyncVersion = true,
146+
});
148147
if (!IsDead) {
149148
ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigateRequest.release()));
150149
}
151150
}
152151

153152
bool ReplyIfNotTopic(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
154-
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
155-
Y_ABORT_UNLESS(result->ResultSet.size() == 1);
156-
const auto& response = result->ResultSet.front();
157-
const TString path = JoinPath(response.Path);
158-
159-
if (ev->Get()->Request.Get()->ResultSet.size() != 1 ||
160-
ev->Get()->Request.Get()->ResultSet.begin()->Kind !=
161-
NSchemeCache::TSchemeCacheNavigate::KindTopic) {
162-
AddIssue(FillIssue(TStringBuilder() << "path '" << path << "' is not a topic",
163-
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR));
164-
RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
165-
return true;
153+
auto const& entries = ev->Get()->Request.Get()->ResultSet;
154+
Y_ABORT_UNLESS(entries.size() == 1);
155+
auto const& entry = entries.front();
156+
if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindTopic) {
157+
return false;
166158
}
167-
return false;
159+
AddIssue(FillIssue(TStringBuilder() << "path '" << JoinPath(entry.Path) << "' is not a topic",
160+
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR));
161+
RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
162+
return true;
168163
}
169164

170165
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
171-
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
172-
Y_ABORT_UNLESS(result->ResultSet.size() == 1);
173-
const auto& response = result->ResultSet.front();
166+
auto const& entries = ev->Get()->Request.Get()->ResultSet;
167+
Y_ABORT_UNLESS(entries.size() == 1);
168+
const auto& response = entries.front();
174169
const TString path = JoinPath(response.Path);
175170

176171
switch (response.Status) {
@@ -245,7 +240,6 @@ namespace NKikimr::NGRpcProxy::V1 {
245240
}
246241
}
247242

248-
249243
void StateWork(TAutoPtr<IEventHandle>& ev) {
250244
switch (ev->GetTypeRewrite()) {
251245
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
@@ -256,6 +250,7 @@ namespace NKikimr::NGRpcProxy::V1 {
256250

257251
protected:
258252
bool IsDead = false;
253+
bool CheckAccessWithWriteTopicPermission = false;
259254
const TString TopicPath;
260255
const TString Database;
261256
};
@@ -339,13 +334,13 @@ namespace NKikimr::NGRpcProxy::V1 {
339334
}
340335

341336
bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const override {
342-
if (this->Request_->GetSerializedToken().empty()) {
343-
return !(AppData()->PQConfig.GetRequireCredentialsInNewProtocol());
344-
} else {
345-
request->UserToken = new NACLib::TUserToken(this->Request_->GetSerializedToken());
337+
if (auto const& token = this->Request_->GetSerializedToken()) {
338+
request->UserToken = new NACLib::TUserToken(token);
346339
return true;
347340
}
341+
return !(AppData()->PQConfig.GetRequireCredentialsInNewProtocol());
348342
}
343+
349344
bool ProcessCdc(const NSchemeCache::TSchemeCacheNavigate::TEntry& response) override {
350345
if constexpr (THasCdcStreamCompatibility<TDerived>::Value) {
351346
if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) {

ydb/services/persqueue_v1/actors/schema_actors.cpp

+37-7
Original file line numberDiff line numberDiff line change
@@ -1319,30 +1319,60 @@ TDescribePartitionActor::TDescribePartitionActor(NKikimr::NGRpcService::IRequest
13191319
{
13201320
}
13211321

1322-
void TDescribePartitionActor::Bootstrap(const NActors::TActorContext& ctx)
1323-
{
1322+
void TDescribePartitionActor::Bootstrap(const NActors::TActorContext& ctx) {
1323+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "TDescribePartitionActor" << ctx.SelfID.ToString() << ": Bootstrap");
1324+
CheckAccessWithWriteTopicPermission = true;
13241325
TBase::Bootstrap(ctx);
13251326
SendDescribeProposeRequest(ctx);
13261327
Become(&TDescribePartitionActor::StateWork);
13271328
}
13281329

13291330
void TDescribePartitionActor::StateWork(TAutoPtr<IEventHandle>& ev) {
13301331
switch (ev->GetTypeRewrite()) {
1332+
case TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType:
1333+
if (NeedToRequestWithDescribeSchema(ev)) {
1334+
// We do not have the UpdateRow permission. Check if we're allowed to DescribeSchema.
1335+
CheckAccessWithWriteTopicPermission = false;
1336+
SendDescribeProposeRequest(ActorContext());
1337+
break;
1338+
}
1339+
[[fallthrough]];
13311340
default:
13321341
if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) {
13331342
TBase::StateWork(ev);
13341343
};
13351344
}
13361345
}
13371346

1338-
void TDescribePartitionActor::HandleCacheNavigateResponse(
1339-
TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev
1340-
) {
1341-
Y_ABORT_UNLESS(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic
1347+
// Return true if we need to send a second request to SchemeCache with DescribeSchema permission,
1348+
// because the first request checking the UpdateRow permission resulted in an AccessDenied error.
1349+
bool TDescribePartitionActor::NeedToRequestWithDescribeSchema(TAutoPtr<IEventHandle>& ev) {
1350+
if (!CheckAccessWithWriteTopicPermission) {
1351+
// We've already sent a request with DescribeSchema, ev is a response to it.
1352+
return false;
1353+
}
1354+
1355+
auto evNav = *reinterpret_cast<typename TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
1356+
auto const& entries = evNav->Get()->Request.Get()->ResultSet;
1357+
Y_ABORT_UNLESS(entries.size() == 1);
1358+
1359+
if (entries.front().Status != NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied) {
1360+
// We do have access to the requested entity or there was an error.
1361+
// Transfer ownership to the ev pointer, and let the base classes' StateWork methods handle the response.
1362+
ev = *reinterpret_cast<TAutoPtr<IEventHandle>*>(&evNav);
1363+
return false;
1364+
}
1365+
1366+
return true;
1367+
}
1368+
1369+
void TDescribePartitionActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
1370+
auto const& entries = ev->Get()->Request.Get()->ResultSet;
1371+
Y_ABORT_UNLESS(entries.size() == 1); // describe for only one topic
13421372
if (ReplyIfNotTopic(ev)) {
13431373
return;
13441374
}
1345-
PQGroupInfo = ev->Get()->Request->ResultSet[0].PQGroupInfo;
1375+
PQGroupInfo = entries[0].PQGroupInfo;
13461376
auto* partRes = Result.mutable_partition();
13471377
partRes->set_partition_id(Settings.Partitions[0]);
13481378
partRes->set_active(true);

ydb/services/persqueue_v1/actors/schema_actors.h

+8-4
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ struct TDescribeTopicActorSettings {
7171
TVector<ui32> Partitions;
7272
bool RequireStats = false;
7373
bool RequireLocation = false;
74-
74+
7575
TDescribeTopicActorSettings()
7676
: Mode(EMode::DescribeTopic)
7777
{}
@@ -85,7 +85,7 @@ struct TDescribeTopicActorSettings {
8585
, RequireStats(requireStats)
8686
, RequireLocation(requireLocation)
8787
{}
88-
88+
8989
static TDescribeTopicActorSettings DescribeTopic(bool requireStats, bool requireLocation, const TVector<ui32>& partitions = {}) {
9090
TDescribeTopicActorSettings res{EMode::DescribeTopic, requireStats, requireLocation};
9191
res.Partitions = partitions;
@@ -104,7 +104,7 @@ struct TDescribeTopicActorSettings {
104104
res.Partitions = partitions;
105105
return res;
106106
}
107-
107+
108108
static TDescribeTopicActorSettings DescribePartitionSettings(ui32 partition, bool stats, bool location) {
109109
TDescribeTopicActorSettings res{EMode::DescribePartitions, stats, location};
110110
res.Partitions = {partition};
@@ -261,9 +261,13 @@ using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo;
261261
void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override;
262262
void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override;
263263
bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override;
264-
264+
265265
virtual void Reply(const TActorContext& ctx) override;
266266

267+
private:
268+
269+
bool NeedToRequestWithDescribeSchema(TAutoPtr<IEventHandle>& ev);
270+
267271
private:
268272
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
269273
Ydb::Topic::DescribePartitionResult Result;

0 commit comments

Comments
 (0)