Skip to content

Commit 3819aed

Browse files
authored
schemeboard: pass describe-result as an opaque payload (#2083)
1 parent ecdcbb4 commit 3819aed

25 files changed

+788
-401
lines changed

ydb/core/protos/flat_tx_scheme.proto

+2-1
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,13 @@ message TEvDescribeSchemeResult {
106106
optional string Reason = 2;
107107
optional string Path = 3;
108108
optional NKikimrSchemeOp.TPathDescription PathDescription = 4;
109-
optional fixed64 PathOwner = 5;
109+
optional fixed64 DEPRECATED_PathOwner = 5; // replaced by PathOwnerId
110110
optional fixed64 PathId = 6;
111111

112112
optional string LastExistedPrefixPath = 7;
113113
optional fixed64 LastExistedPrefixPathId = 8;
114114
optional NKikimrSchemeOp.TPathDescription LastExistedPrefixDescription = 9;
115+
115116
optional fixed64 PathOwnerId = 10;
116117
}
117118

ydb/core/protos/scheme_board.proto

+71-11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import "ydb/core/protos/flat_tx_scheme.proto";
1+
import "ydb/core/scheme/protos/pathid.proto";
22

33
package NKikimrSchemeBoard;
44
option java_package = "ru.yandex.kikimr.proto";
@@ -13,22 +13,76 @@ message TEvHandshake {
1313
optional uint64 Generation = 2;
1414
}
1515

16-
// here and below
17-
// Owner is the tablet id of schemeshard witch holds the records
18-
// LocalPathId is a second part of TPathId
19-
// PathOwnerId is a first part of TPathId
16+
// Here and below.
17+
// Owner is the tablet id of schemeshard which holds the records.
18+
// (PathOwnerId, LocalPathId) constitute TPathId of the object.
2019

20+
// TEvUpdate.DescribeSchemeResultSerialized is a NKikimrScheme.TEvDescribeSchemeResult
21+
// in the form of opaque payload.
22+
// Originally, that field existed as a properly typed TEvDescribeSchemeResult message.
23+
// However, that induce additional overhead to serialize and deserialize this message
24+
// when transferring over wire.
25+
// This performance cost is usually either negligible or imperceptible.
26+
// But in specific situations, particularly when rapidly updating partitioning information
27+
// for tables with huge number of shards, this overhead could lead to significant issues.
28+
// Schemeboard replicas could get overloaded and become unresponsive to further requests.
29+
// This is problematic, especially considering the schemeboard subsystem's critical role
30+
// in servicing all databases within a cluster, making it a Single Point of Failure (SPOF).
31+
//
32+
// The core realization is that the schemeboard components do not require the full content of
33+
// a TEvDescribeSchemeResult message to operate efficiently. Instead, only a limited set of
34+
// fields (path, path-id, version and info about subdomain/database) is required for processing.
35+
// And a whole TEvDescribeSchemeResult could be passed through as an opaque payload.
36+
//
37+
// Type change from TEvDescribeSchemeResult to (repeated) bytes without changing field number
38+
// is a safe move. Actual value of the field remains unchanged at the wire-format level.
39+
// Thus, older implementations will interpret the payload as a TEvDescribeSchemeResult message
40+
// and proceed with deserialization as usual. And newer implementations will recognize the data
41+
// as a binary blob and will deserialize it explicitly only when necessary.
42+
//
43+
// Note that the `repeated` label for the `DescribeSchemeResultSerialized` field is essential
44+
// to remain backward-compatible with the previous implementation. This is because even if
45+
// DescribeSchemeResult previously was labeled `optional` but actual value used at
46+
// the wire-format level was (and is) a pack of TEvDescribeSchemeResult messages.
47+
// Automerge of consecutive messages for the same field is a prominent feature of the protobuf.
48+
// Schemeshard use that feature to supply full TEvDescribeSchemeResult as a sequence of
49+
// partially filled TEvDescribeSchemeResult's.
50+
//
51+
// - Path
52+
// - PathOwnerId, LocalPathId
53+
// - PathDirEntryPathVersion
54+
// - PathSubdomainPathId
55+
// - PathAbandonedTenantsSchemeShards
56+
// are taken from the original TEvDescribeSchemeResult (one way or another).
57+
//
2158
message TEvUpdate {
2259
optional uint64 Owner = 1;
2360
optional uint64 Generation = 2;
2461
optional TLocalPathIdRange DeletedLocalPathIds = 3;
25-
optional string Path = 4;
26-
optional uint64 LocalPathId = 5;
62+
63+
optional string Path = 4; // extracted from DescribeSchemeResult.Path
64+
optional uint64 LocalPathId = 5; // extracted from DescribeSchemeResult.PathId
65+
2766
optional bool IsDeletion = 6 [default = false];
28-
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 7;
67+
68+
repeated bytes DescribeSchemeResultSerialized = 7;
69+
2970
optional bool NeedAck = 8 [default = false];
30-
optional uint64 PathOwnerId = 9;
71+
72+
optional uint64 PathOwnerId = 9; // extracted from DescribeSchemeResult.PathOwnerId, DescribeSchemeResult.PathDescription.Self.SchemeshardId in order of presence
73+
3174
optional TLocalPathIdRange MigratedLocalPathIds = 10;
75+
76+
// Explicit values extracted from DescribeSchemeResultSerialized
77+
78+
// DescribeSchemeResult.PathDescription.Self.PathVersion
79+
optional uint64 PathDirEntryPathVersion = 11;
80+
81+
// DescribeSchemeResult.PathDescription.DomainDescription.DomainKey
82+
optional NKikimrProto.TPathID PathSubdomainPathId = 13;
83+
84+
// DescribeSchemeResult.PathDescription.AbandonedTenantsSchemeShards
85+
repeated uint64 PathAbandonedTenantsSchemeShards = 14;
3286
}
3387

3488
message TEvUpdateAck {
@@ -65,16 +119,22 @@ message TEvUnsubscribe {
65119
optional uint64 LocalPathId = 3;
66120
}
67121

122+
// See comments for TEvUpdate.
68123
message TEvNotify {
69124
optional string Path = 1;
70125
// and/or
71126
optional uint64 PathOwnerId = 2;
72127
optional uint64 LocalPathId = 3;
73128
// common fields
74129
optional bool IsDeletion = 4 [default = false];
75-
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 5;
76-
optional uint64 Version = 6;
130+
131+
optional bytes DescribeSchemeResultSerialized = 5;
132+
133+
optional uint64 Version = 6; // same as TEvUpdate.PathDirEntryPathVersion
77134
optional bool Strong = 7 [default = false];
135+
136+
optional NKikimrProto.TPathID PathSubdomainPathId = 8;
137+
repeated uint64 PathAbandonedTenantsSchemeShards = 9;
78138
}
79139

80140
message TEvNotifyAck {

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

+22-21
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
789789
static THolder<TDataStreamsClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
790790
return MakeHolder<TDataStreamsClient>(driver, NYdb::TCommonClientSettings().Database(database));
791791
}
792-
};
792+
};
793793

794794
class TTestTopicEnv: public TTestEnv<TTestTopicEnv, NYdb::NTopic::TTopicClient> {
795795
public:
@@ -798,7 +798,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
798798
static THolder<NYdb::NTopic::TTopicClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
799799
return MakeHolder<NYdb::NTopic::TTopicClient>(driver, NYdb::NTopic::TTopicClientSettings().Database(database));
800800
}
801-
};
801+
};
802802

803803
TShardedTableOptions SimpleTable() {
804804
return TShardedTableOptions()
@@ -1344,7 +1344,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
13441344
(3, 30);
13451345
)", R"(
13461346
DELETE FROM `/Root/Table` WHERE key = 1;
1347-
)"}, {
1347+
)"}, {
13481348
R"({"update":{},"key":[1]})",
13491349
R"({"update":{},"key":[2]})",
13501350
R"({"update":{},"key":[3]})",
@@ -1360,7 +1360,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
13601360
(3, 30);
13611361
)", R"(
13621362
DELETE FROM `/Root/Table` WHERE key = 1;
1363-
)"}, {
1363+
)"}, {
13641364
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
13651365
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
13661366
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
@@ -1376,7 +1376,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
13761376
(3, 30);
13771377
)", R"(
13781378
DELETE FROM `/Root/Table` WHERE key = 1;
1379-
)"}, {
1379+
)"}, {
13801380
R"({"update":{"value":10},"key":[1]})",
13811381
R"({"update":{"value":20},"key":[2]})",
13821382
R"({"update":{"value":30},"key":[3]})",
@@ -1397,7 +1397,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
13971397
(3, 300);
13981398
)", R"(
13991399
DELETE FROM `/Root/Table` WHERE key = 1;
1400-
)"}, {
1400+
)"}, {
14011401
R"({"update":{},"newImage":{"value":10},"key":[1]})",
14021402
R"({"update":{},"newImage":{"value":20},"key":[2]})",
14031403
R"({"update":{},"newImage":{"value":30},"key":[3]})",
@@ -1421,7 +1421,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
14211421
(3, 300);
14221422
)", R"(
14231423
DELETE FROM `/Root/Table` WHERE key = 1;
1424-
)"}, {
1424+
)"}, {
14251425
{DebeziumBody("c", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
14261426
{DebeziumBody("c", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
14271427
{DebeziumBody("c", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
@@ -1445,7 +1445,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
14451445
(3, 300);
14461446
)", R"(
14471447
DELETE FROM `/Root/Table` WHERE key = 1;
1448-
)"}, {
1448+
)"}, {
14491449
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
14501450
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
14511451
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
@@ -1456,7 +1456,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
14561456
});
14571457
}
14581458

1459-
Y_UNIT_TEST(NewImageLogDebezium) {
1459+
Y_UNIT_TEST(NewImageLogDebezium) {
14601460
TopicRunner::Read(SimpleTable(), NewImage(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
14611461
UPSERT INTO `/Root/Table` (key, value) VALUES
14621462
(1, 10),
@@ -1469,7 +1469,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
14691469
(3, 300);
14701470
)", R"(
14711471
DELETE FROM `/Root/Table` WHERE key = 1;
1472-
)"}, {
1472+
)"}, {
14731473
{DebeziumBody("u", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
14741474
{DebeziumBody("u", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
14751475
{DebeziumBody("u", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
@@ -1486,7 +1486,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
14861486
(1, 10),
14871487
(2, 20),
14881488
(3, 30);
1489-
)"}, {
1489+
)"}, {
14901490
R"({"update":{},"key":[1],"ts":"***"})",
14911491
R"({"update":{},"key":[2],"ts":"***"})",
14921492
R"({"update":{},"key":[3],"ts":"***"})",
@@ -1512,7 +1512,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
15121512
UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData) VALUES (
15131513
1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}')
15141514
);
1515-
)"}, {
1515+
)"}, {
15161516
WriteJson(NJson::TJsonMap({
15171517
{"awsRegion", ""},
15181518
{"dynamodb", NJson::TJsonMap({
@@ -1541,7 +1541,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
15411541
);
15421542
)", R"(
15431543
DELETE FROM `/Root/Table` WHERE __Hash = 1;
1544-
)"}, {
1544+
)"}, {
15451545
WriteJson(NJson::TJsonMap({
15461546
{"awsRegion", ""},
15471547
{"dynamodb", NJson::TJsonMap({
@@ -1639,7 +1639,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
16391639
(1, 0.0%s/0.0%s),
16401640
(2, 1.0%s/0.0%s),
16411641
(3, -1.0%s/0.0%s);
1642-
)", s, s, s, s, s, s)}, {
1642+
)", s, s, s, s, s, s)}, {
16431643
R"({"update":{"value":"nan"},"key":[1]})",
16441644
R"({"update":{"value":"inf"},"key":[2]})",
16451645
R"({"update":{"value":"-inf"},"key":[3]})",
@@ -1674,7 +1674,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
16741674
TopicRunner::Read(table, KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {Sprintf(R"(
16751675
UPSERT INTO `/Root/Table` (key, value) VALUES
16761676
("%s", 1);
1677-
)", key.c_str())}, {
1677+
)", key.c_str())}, {
16781678
{DebeziumBody("u", nullptr, nullptr), {{"__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())}}},
16791679
});
16801680
}
@@ -2043,7 +2043,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
20432043
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
20442044
UPSERT INTO `/Root/TableAux` (key, value)
20452045
VALUES (1, 10);
2046-
)");
2046+
)");
20472047

20482048
SetSplitMergePartCountLimit(&runtime, -1);
20492049
const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
@@ -2292,7 +2292,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
22922292
auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
22932293
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
22942294

2295-
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
2295+
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
22962296
AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));
22972297

22982298
// execute on old partitions
@@ -2376,7 +2376,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
23762376

23772377
case TSchemeBoardEvents::EvUpdate:
23782378
if (auto* msg = ev->Get<TSchemeBoardEvents::TEvUpdate>()) {
2379-
const auto desc = msg->GetRecord().GetDescribeSchemeResult();
2379+
NKikimrScheme::TEvDescribeSchemeResult desc;
2380+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(desc, *msg->GetRecord().GetDescribeSchemeResultSerialized().begin()));
23802381
if (desc.GetPath() == "/Root/Table/Stream" && desc.GetPathDescription().GetSelf().GetCreateFinished()) {
23812382
delayed.emplace_back(ev.Release());
23822383
return TTestActorRuntime::EEventAction::DROP;
@@ -2446,7 +2447,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
24462447
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
24472448
UPSERT INTO `/Root/Table` (key, value)
24482449
VALUES (1, 10);
2449-
)");
2450+
)");
24502451

24512452
SetSplitMergePartCountLimit(&runtime, -1);
24522453
const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
@@ -3266,7 +3267,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
32663267
auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
32673268
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
32683269

3269-
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
3270+
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
32703271
AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));
32713272

32723273
// merge
@@ -3298,7 +3299,7 @@ template <>
32983299
void Out<std::pair<TString, TString>>(IOutputStream& output, const std::pair<TString, TString>& x) {
32993300
output << x.first << ":" << x.second;
33003301
}
3301-
3302+
33023303
void AppendToString(TString& dst, const std::pair<TString, TString>& x) {
33033304
TStringOutput output(dst);
33043305
output << x;

ydb/core/tx/scheme_board/cache_ut.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ class TCacheTest: public TTestWithSchemeshard {
3737
" Kind: \"pool-kind-1\" "
3838
"} "
3939
" Name: \"Root\" ");
40+
41+
// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NLog::PRI_DEBUG);
42+
// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG);
43+
// Context->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
44+
// Context->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_DEBUG);
4045
}
4146

4247
UNIT_TEST_SUITE(TCacheTest);

0 commit comments

Comments
 (0)