Skip to content

Commit 312992d

Browse files
authored
24-3: Allow streams on index tables, replicate index tables (#7150)
1 parent c1bcd71 commit 312992d

File tree

53 files changed

+1313
-593
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1313
-593
lines changed

ydb/core/base/path.h

+8
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,12 @@ inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TStr
3737
return path;
3838
}
3939

40+
inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TVector<TString>& childPath) {
41+
auto path = parentPath;
42+
for (const auto& childName : childPath) {
43+
path.push_back(childName);
44+
}
45+
return path;
46+
}
47+
4048
}

ydb/core/grpc_services/rpc_alter_table.cpp

+69-8
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
109109
break;
110110

111111
case EOp::Attribute:
112-
PrepareAlterUserAttrubutes();
112+
case EOp::AddChangefeed:
113+
case EOp::DropChangefeed:
114+
GetProxyServices();
113115
break;
114116

115-
case EOp::AddChangefeed:
116117
case EOp::DropIndex:
117-
case EOp::DropChangefeed:
118118
case EOp::RenameIndex:
119119
AlterTable(ctx);
120120
break;
@@ -197,7 +197,7 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
197197
Navigate(msg->Services.SchemeCache, ctx);
198198
}
199199

200-
void PrepareAlterUserAttrubutes() {
200+
void GetProxyServices() {
201201
using namespace NTxProxy;
202202
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvGetProxyServicesRequest);
203203
}
@@ -222,13 +222,38 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
222222
auto ev = CreateNavigateForPath(DatabaseName);
223223
{
224224
auto& entry = static_cast<TEvTxProxySchemeCache::TEvNavigateKeySet*>(ev)->Request->ResultSet.emplace_back();
225-
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
226225
entry.Path = paths;
227226
}
228227

229228
Send(schemeCache, ev);
230229
}
231230

231+
void Navigate(const TTableId& pathId) {
232+
DatabaseName = Request_->GetDatabaseName()
233+
.GetOrElse(DatabaseFromDomain(AppData()));
234+
235+
auto ev = CreateNavigateForPath(DatabaseName);
236+
{
237+
auto& entry = static_cast<TEvTxProxySchemeCache::TEvNavigateKeySet*>(ev)->Request->ResultSet.emplace_back();
238+
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
239+
entry.TableId = pathId;
240+
entry.ShowPrivatePath = true;
241+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
242+
}
243+
244+
Send(MakeSchemeCacheID(), ev);
245+
}
246+
247+
static bool IsChangefeedOperation(EOp type) {
248+
switch (type) {
249+
case EOp::AddChangefeed:
250+
case EOp::DropChangefeed:
251+
return true;
252+
default:
253+
return false;
254+
}
255+
}
256+
232257
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
233258
TXLOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult"
234259
<< ", errors# " << ev->Get()->Request.Get()->ErrorCount);
@@ -251,13 +276,48 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
251276
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
252277
}
253278

279+
Y_ABORT_UNLESS(!resp->ResultSet.empty());
280+
const auto& entry = resp->ResultSet.back();
281+
282+
switch (entry.Kind) {
283+
case NSchemeCache::TSchemeCacheNavigate::KindTable:
284+
case NSchemeCache::TSchemeCacheNavigate::KindColumnTable:
285+
case NSchemeCache::TSchemeCacheNavigate::KindExternalTable:
286+
case NSchemeCache::TSchemeCacheNavigate::KindExternalDataSource:
287+
case NSchemeCache::TSchemeCacheNavigate::KindView:
288+
break; // table
289+
case NSchemeCache::TSchemeCacheNavigate::KindIndex:
290+
if (IsChangefeedOperation(OpType)) {
291+
break;
292+
}
293+
[[fallthrough]];
294+
default:
295+
Request_->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TStringBuilder()
296+
<< "Unable to nagivate: " << JoinPath(entry.Path) << " status: PathNotTable"));
297+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
298+
}
299+
254300
switch (OpType) {
255301
case EOp::AddIndex:
256302
return AlterTableAddIndexOp(resp, ctx);
257303
case EOp::Attribute:
258-
Y_ABORT_UNLESS(!resp->ResultSet.empty());
259304
ResolvedPathId = resp->ResultSet.back().TableId.PathId;
260305
return AlterTable(ctx);
306+
case EOp::AddChangefeed:
307+
case EOp::DropChangefeed:
308+
if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindIndex) {
309+
AlterTable(ctx);
310+
} else if (auto list = entry.ListNodeEntry) {
311+
if (list->Children.size() != 1) {
312+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
313+
}
314+
315+
const auto& child = list->Children.at(0);
316+
AlterTable(ctx, CanonizePath(ChildPath(NKikimr::SplitPath(GetProtoRequest()->path()), child.Name)));
317+
} else {
318+
Navigate(entry.TableId);
319+
}
320+
break;
261321
default:
262322
TXLOG_E("Got unexpected cache response");
263323
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
@@ -351,13 +411,14 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
351411
Die(ctx);
352412
}
353413

354-
void AlterTable(const TActorContext &ctx) {
414+
void AlterTable(const TActorContext &ctx, const TMaybe<TString>& overridePath = {}) {
355415
const auto req = GetProtoRequest();
356416
std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
357417
auto modifyScheme = proposeRequest->Record.MutableTransaction()->MutableModifyScheme();
418+
modifyScheme->SetAllowAccessToPrivatePaths(overridePath.Defined());
358419
Ydb::StatusIds::StatusCode code;
359420
TString error;
360-
if (!BuildAlterTableModifyScheme(req, modifyScheme, Profiles, ResolvedPathId, code, error)) {
421+
if (!BuildAlterTableModifyScheme(overridePath.GetOrElse(req->path()), req, modifyScheme, Profiles, ResolvedPathId, code, error)) {
361422
NYql::TIssues issues;
362423
issues.AddIssue(NYql::TIssue(error));
363424
return Reply(code, issues, ctx);

ydb/core/grpc_services/rpc_describe_table.cpp

+70-13
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
#include "service_table.h"
2-
#include <ydb/core/grpc_services/base/base.h>
3-
41
#include "rpc_calls.h"
52
#include "rpc_scheme_base.h"
6-
73
#include "service_table.h"
8-
#include "rpc_common/rpc_common.h"
4+
5+
#include <ydb/core/base/path.h>
6+
#include <ydb/core/base/table_index.h>
7+
#include <ydb/core/grpc_services/base/base.h>
8+
#include <ydb/core/grpc_services/rpc_common/rpc_common.h>
99
#include <ydb/core/tx/schemeshard/schemeshard.h>
1010
#include <ydb/core/ydb_convert/table_description.h>
1111
#include <ydb/core/ydb_convert/ydb_convert.h>
@@ -22,25 +22,84 @@ using TEvDescribeTableRequest = TGrpcRequestOperationCall<Ydb::Table::DescribeTa
2222
class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDescribeTableRequest> {
2323
using TBase = TRpcSchemeRequestActor<TDescribeTableRPC, TEvDescribeTableRequest>;
2424

25+
TString OverrideName;
26+
27+
static bool ShowPrivatePath(const TString& path) {
28+
if (AppData()->AllowPrivateTableDescribeForTest) {
29+
return true;
30+
}
31+
32+
if (path.EndsWith("/indexImplTable")) {
33+
return true;
34+
}
35+
36+
return false;
37+
}
38+
2539
public:
2640
TDescribeTableRPC(IRequestOpCtx* msg)
2741
: TBase(msg) {}
2842

2943
void Bootstrap(const TActorContext &ctx) {
3044
TBase::Bootstrap(ctx);
3145

32-
SendProposeRequest(ctx);
46+
const auto& path = GetProtoRequest()->path();
47+
const auto paths = NKikimr::SplitPath(path);
48+
if (paths.empty()) {
49+
Request_->RaiseIssue(NYql::TIssue("Invalid path"));
50+
return Reply(Ydb::StatusIds::BAD_REQUEST, ctx);
51+
}
52+
53+
auto navigate = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
54+
navigate->DatabaseName = CanonizePath(Request_->GetDatabaseName().GetOrElse(""));
55+
auto& entry = navigate->ResultSet.emplace_back();
56+
entry.Path = paths;
57+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
58+
entry.SyncVersion = true;
59+
entry.ShowPrivatePath = ShowPrivatePath(path);
60+
61+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate));
3362
Become(&TDescribeTableRPC::StateWork);
3463
}
3564

3665
private:
3766
void StateWork(TAutoPtr<IEventHandle>& ev) {
3867
switch (ev->GetTypeRewrite()) {
68+
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
3969
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
4070
default: TBase::StateWork(ev);
4171
}
4272
}
4373

74+
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
75+
auto* navigate = ev->Get()->Request.Get();
76+
77+
Y_ABORT_UNLESS(navigate->ResultSet.size() == 1);
78+
const auto& entry = navigate->ResultSet.front();
79+
80+
if (navigate->ErrorCount > 0) {
81+
switch (entry.Status) {
82+
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
83+
case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
84+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
85+
default:
86+
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
87+
}
88+
}
89+
90+
if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindIndex) {
91+
auto list = entry.ListNodeEntry;
92+
if (!list || list->Children.size() != 1) {
93+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
94+
}
95+
96+
OverrideName = entry.Path.back();
97+
SendProposeRequest(CanonizePath(ChildPath(entry.Path, list->Children.at(0).Name)), ctx);
98+
} else {
99+
SendProposeRequest(GetProtoRequest()->path(), ctx);
100+
}
101+
}
102+
44103
void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
45104
const auto& record = ev->Get()->GetRecord();
46105
const auto status = record.GetStatus();
@@ -53,9 +112,10 @@ class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDe
53112
case NKikimrScheme::StatusSuccess: {
54113
const auto& pathDescription = record.GetPathDescription();
55114
Ydb::Scheme::Entry* selfEntry = describeTableResult.mutable_self();
56-
selfEntry->set_name(pathDescription.GetSelf().GetName());
57-
selfEntry->set_type(static_cast<Ydb::Scheme::Entry::Type>(pathDescription.GetSelf().GetPathType()));
58115
ConvertDirectoryEntry(pathDescription.GetSelf(), selfEntry, true);
116+
if (OverrideName) {
117+
selfEntry->set_name(OverrideName);
118+
}
59119

60120
if (pathDescription.HasColumnTableDescription()) {
61121
const auto& tableDescription = pathDescription.GetColumnTableDescription();
@@ -136,9 +196,8 @@ class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDe
136196
}
137197
}
138198

139-
void SendProposeRequest(const TActorContext &ctx) {
199+
void SendProposeRequest(const TString& path, const TActorContext& ctx) {
140200
const auto req = GetProtoRequest();
141-
const TString path = req->path();
142201

143202
std::unique_ptr<TEvTxUserProxy::TEvNavigate> navigateRequest(new TEvTxUserProxy::TEvNavigate());
144203
SetAuthToken(navigateRequest, *Request_);
@@ -153,9 +212,7 @@ class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDe
153212
record->MutableOptions()->SetReturnPartitionStats(true);
154213
}
155214

156-
if (AppData(ctx)->AllowPrivateTableDescribeForTest || path.EndsWith("/indexImplTable")) {
157-
record->MutableOptions()->SetShowPrivateTable(true);
158-
}
215+
record->MutableOptions()->SetShowPrivateTable(ShowPrivatePath(path));
159216

160217
ctx.Send(MakeTxProxyID(), navigateRequest.release());
161218
}

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

+64
Original file line numberDiff line numberDiff line change
@@ -4075,6 +4075,70 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
40754075
}
40764076
}
40774077

4078+
Y_UNIT_TEST(ChangefeedOnIndexTable) {
4079+
TKikimrRunner kikimr(TKikimrSettings()
4080+
.SetPQConfig(DefaultPQConfig())
4081+
.SetEnableChangefeedsOnIndexTables(true));
4082+
auto db = kikimr.GetTableClient();
4083+
auto session = db.CreateSession().GetValueSync().GetSession();
4084+
4085+
{
4086+
auto query = R"(
4087+
--!syntax_v1
4088+
CREATE TABLE `/Root/table` (
4089+
Key Uint64,
4090+
Value String,
4091+
PRIMARY KEY (Key),
4092+
INDEX SyncIndex GLOBAL SYNC ON (`Value`),
4093+
INDEX AsyncIndex GLOBAL ASYNC ON (`Value`)
4094+
);
4095+
)";
4096+
4097+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
4098+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
4099+
}
4100+
4101+
const auto changefeed = TChangefeedDescription("feed", EChangefeedMode::KeysOnly, EChangefeedFormat::Json);
4102+
{
4103+
auto result = session.AlterTable("/Root/table/AsyncIndex", TAlterTableSettings()
4104+
.AppendAddChangefeeds(changefeed)
4105+
).ExtractValueSync();
4106+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
4107+
}
4108+
{
4109+
auto result = session.AlterTable("/Root/table/SyncIndex", TAlterTableSettings()
4110+
.AppendAddChangefeeds(changefeed)
4111+
).ExtractValueSync();
4112+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
4113+
}
4114+
}
4115+
4116+
Y_UNIT_TEST(DescribeIndexTable) {
4117+
TKikimrRunner kikimr;
4118+
auto db = kikimr.GetTableClient();
4119+
auto session = db.CreateSession().GetValueSync().GetSession();
4120+
4121+
{
4122+
auto query = R"(
4123+
--!syntax_v1
4124+
CREATE TABLE `/Root/table` (
4125+
Key Uint64,
4126+
Value String,
4127+
PRIMARY KEY (Key),
4128+
INDEX SyncIndex GLOBAL SYNC ON (`Value`)
4129+
);
4130+
)";
4131+
4132+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
4133+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
4134+
}
4135+
{
4136+
auto desc = session.DescribeTable("/Root/table/SyncIndex").ExtractValueSync();
4137+
UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
4138+
UNIT_ASSERT_VALUES_EQUAL(desc.GetEntry().Name, "SyncIndex");
4139+
}
4140+
}
4141+
40784142
Y_UNIT_TEST(CreatedAt) {
40794143
TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
40804144
auto scheme = NYdb::NScheme::TSchemeClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));

ydb/core/protos/feature_flags.proto

+1
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,5 @@ message TFeatureFlags {
145145
optional bool EnableColumnStatistics = 130 [default = false];
146146
optional bool EnableSingleCompositeActionGroup = 131 [default = false];
147147
optional bool EnableResourcePoolsOnServerless = 132 [default = false];
148+
optional bool EnableChangefeedsOnIndexTables = 134 [default = false];
148149
}

ydb/core/protos/flat_scheme_op.proto

-5
Original file line numberDiff line numberDiff line change
@@ -919,10 +919,6 @@ message TCreateCdcStream {
919919
optional TCdcStreamDescription StreamDescription = 2;
920920
optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default
921921
optional uint32 TopicPartitions = 4;
922-
oneof IndexMode {
923-
google.protobuf.Empty AllIndexes = 5; // Create topic per each index
924-
string IndexName = 6;
925-
}
926922
}
927923

928924
message TAlterCdcStream {
@@ -1637,7 +1633,6 @@ message TIndexBuildControl {
16371633

16381634
message TLockConfig {
16391635
optional string Name = 1;
1640-
optional bool AllowIndexImplLock = 2;
16411636
}
16421637

16431638
message TLockGuard {

ydb/core/testlib/basics/feature_flags.h

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class TTestFeatureFlagsHolder {
6161
FEATURE_FLAG_SETTER(EnableCMSRequestPriorities)
6262
FEATURE_FLAG_SETTER(EnableTableDatetime64)
6363
FEATURE_FLAG_SETTER(EnableResourcePools)
64+
FEATURE_FLAG_SETTER(EnableChangefeedsOnIndexTables)
6465

6566
#undef FEATURE_FLAG_SETTER
6667
};

ydb/core/tx/replication/controller/dst_alterer.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> {
4141

4242
switch (Kind) {
4343
case TReplication::ETargetKind::Table:
44+
case TReplication::ETargetKind::IndexTable:
4445
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
4546
PathIdFromPathId(DstPathId, tx.MutableAlterTable()->MutablePathId());
4647
tx.MutableAlterTable()->MutableReplicationConfig()->SetMode(

0 commit comments

Comments
 (0)