Skip to content

Commit 5a1e03d

Browse files
Dump/resotore changefeeds tests (#15328)
1 parent 6ef51f6 commit 5a1e03d

File tree

2 files changed

+143
-27
lines changed

2 files changed

+143
-27
lines changed

ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,22 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
4343
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/permissions.pb";
4444
}
4545

46-
static bool IsView(TStringBuf schemeKey) {
47-
return schemeKey.EndsWith(NYdb::NDump::NFiles::CreateView().FileName);
46+
static TString ChangefeedDescriptionKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, const TString& changefeedName) {
47+
Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
48+
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/" << changefeedName << "/changefeed_description.pb";
4849
}
4950

50-
static bool NoObjectFound(Aws::S3::S3Errors errorType) {
51-
return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY;
51+
static TString TopicDescriptionKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, const TString& changefeedName) {
52+
Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
53+
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/" << changefeedName << "/topic_description.pb";
5254
}
5355

54-
static TString ChangefeedDescriptionKey(const TString& changefeedPrefix) {
55-
return TStringBuilder() << changefeedPrefix << "/changefeed_description.pb";
56+
static bool IsView(TStringBuf schemeKey) {
57+
return schemeKey.EndsWith(NYdb::NDump::NFiles::CreateView().FileName);
5658
}
5759

58-
static TString TopicDescriptionKey(const TString& changefeedPrefix) {
59-
return TStringBuilder() << changefeedPrefix << "/topic_description.pb";
60+
static bool NoObjectFound(Aws::S3::S3Errors errorType) {
61+
return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY;
6062
}
6163

6264
void HeadObject(const TString& key) {
@@ -148,8 +150,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
148150
}
149151

150152
const auto contentLength = result.GetResult().GetContentLength();
151-
Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsKeys.size());
152-
GetObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
153+
Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsNames.size());
154+
GetObject(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
153155
}
154156

155157
void HandleTopic(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
@@ -164,8 +166,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
164166
}
165167

166168
const auto contentLength = result.GetResult().GetContentLength();
167-
Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsKeys.size());
168-
GetObject(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
169+
Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsNames.size());
170+
GetObject(TopicDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
169171
}
170172

171173
void GetObject(const TString& key, const std::pair<ui64, ui64>& range) {
@@ -340,11 +342,11 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
340342

341343
auto nextStep = [this]() {
342344
Become(&TThis::StateDownloadTopics);
343-
HeadObject(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]));
345+
HeadObject(TopicDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]));
344346
};
345347

346348
if (NeedValidateChecksums) {
347-
StartValidatingChecksum(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body, nextStep);
349+
StartValidatingChecksum(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), msg.Body, nextStep);
348350
} else {
349351
nextStep();
350352
}
@@ -376,16 +378,16 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
376378
*item.Changefeeds.MutableChangefeeds(IndexDownloadedChangefeed)->MutableTopic() = std::move(topic);
377379

378380
auto nextStep = [this]() {
379-
if (++IndexDownloadedChangefeed >= ChangefeedsKeys.size()) {
381+
if (++IndexDownloadedChangefeed >= ChangefeedsNames.size()) {
380382
Reply();
381383
} else {
382384
Become(&TThis::StateDownloadChangefeeds);
383-
HeadObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]));
385+
HeadObject(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]));
384386
}
385387
};
386388

387389
if (NeedValidateChecksums) {
388-
StartValidatingChecksum(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body, nextStep);
390+
StartValidatingChecksum(TopicDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]), msg.Body, nextStep);
389391
} else {
390392
nextStep();
391393
}
@@ -414,22 +416,22 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
414416
}
415417

416418
const auto& objects = result.GetResult().GetContents();
417-
ChangefeedsKeys.clear();
418-
ChangefeedsKeys.reserve(objects.size());
419+
ChangefeedsNames.clear();
420+
ChangefeedsNames.reserve(objects.size());
419421

420422
for (const auto& obj : objects) {
421423
const TFsPath& path = obj.GetKey();
422424
if (path.GetName() == "changefeed_description.pb") {
423-
ChangefeedsKeys.push_back(path.Dirname());
425+
ChangefeedsNames.push_back(path.Parent().GetName());
424426
}
425427
}
426428

427-
if (!ChangefeedsKeys.empty()) {
429+
if (!ChangefeedsNames.empty()) {
428430
auto& item = ImportInfo->Items.at(ItemIdx);
429-
Resize(item.Changefeeds.MutableChangefeeds(), ChangefeedsKeys.size());
431+
Resize(item.Changefeeds.MutableChangefeeds(), ChangefeedsNames.size());
430432

431-
Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsKeys.size());
432-
HeadObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]));
433+
Y_ABORT_UNLESS(IndexDownloadedChangefeed < ChangefeedsNames.size());
434+
HeadObject(ChangefeedDescriptionKeyFromSettings(ImportInfo->Settings, ItemIdx, ChangefeedsNames[IndexDownloadedChangefeed]));
433435
} else {
434436
Reply();
435437
}
@@ -633,7 +635,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
633635
const TString MetadataKey;
634636
TString SchemeKey;
635637
const TString PermissionsKey;
636-
TVector<TString> ChangefeedsKeys;
638+
TVector<TString> ChangefeedsNames;
637639
ui64 IndexDownloadedChangefeed = 0;
638640

639641
const ui32 Retries;

ydb/services/ydb/backup_ut/ydb_backup_ut.cpp

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,12 @@ NTopic::TTopicDescription DescribeTopic(NTopic::TTopicClient& topicClient, const
319319
return describeResult.GetTopicDescription();
320320
}
321321

322+
std::vector<TChangefeedDescription> DescribeChangefeeds(TSession& session, const TString& tablePath) {
323+
auto describeResult = session.DescribeTable(tablePath).ExtractValueSync();
324+
UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
325+
return describeResult.GetTableDescription().GetChangefeedDescriptions();
326+
}
327+
322328
// note: the storage pool kind must be preconfigured in the server
323329
void CreateDatabase(TTenants& tenants, TStringBuf path, TStringBuf storagePoolKind) {
324330
Ydb::Cms::CreateDatabaseRequest request;
@@ -788,6 +794,75 @@ void TestViewDependentOnAnotherViewIsRestored(
788794
CompareResults(GetTableContent(session, dependentView), originalContent);
789795
}
790796

797+
std::pair<std::vector<TString>, std::vector<TString>>
798+
GetChangefeedAndTopicDescriptions(const char* table, TSession& session, NTopic::TTopicClient& topicClient) {
799+
auto describeChangefeeds = DescribeChangefeeds(session, table);
800+
const auto vectorSize = describeChangefeeds.size();
801+
802+
std::vector<TString> changefeedsStr(vectorSize);
803+
std::transform(describeChangefeeds.begin(), describeChangefeeds.end(), changefeedsStr.begin(), [=](TChangefeedDescription changefeedDesc){
804+
return changefeedDesc.ToString();
805+
});
806+
807+
std::vector<TString> topicsStr(vectorSize);
808+
std::transform(describeChangefeeds.begin(), describeChangefeeds.end(), topicsStr.begin(), [table, &topicClient](TChangefeedDescription changefeedDesc){
809+
TString protoStr;
810+
auto proto = TProtoAccessor::GetProto(
811+
DescribeTopic(topicClient, TStringBuilder() << table << "/" << changefeedDesc.GetName())
812+
);
813+
proto.clear_self();
814+
proto.clear_topic_stats();
815+
816+
google::protobuf::TextFormat::PrintToString(
817+
proto, &protoStr
818+
);
819+
return protoStr;
820+
});
821+
822+
return {changefeedsStr, topicsStr};
823+
}
824+
825+
void TestChangefeedAndTopicDescriptionsIsPreserved(
826+
const char* table, TSession& session, NTopic::TTopicClient& topicClient,
827+
TBackupFunction&& backup, TRestoreFunction&& restore, const TVector<TString>& changefeeds
828+
) {
829+
ExecuteDataDefinitionQuery(session, Sprintf(R"(
830+
CREATE TABLE `%s` (
831+
Key Uint32,
832+
Value Utf8,
833+
PRIMARY KEY (Key)
834+
);
835+
)",
836+
table
837+
));
838+
839+
for (const auto& changefeed : changefeeds) {
840+
ExecuteDataDefinitionQuery(session, Sprintf(R"(
841+
ALTER TABLE `%s` ADD CHANGEFEED `%s` WITH (
842+
FORMAT = 'JSON',
843+
MODE = 'UPDATES'
844+
);
845+
)",
846+
table,
847+
changefeed.c_str()
848+
));
849+
}
850+
851+
Cerr << "GetChangefeedAndTopicDescriptions: " << Endl;
852+
auto changefeedsAndTopicsBefore = GetChangefeedAndTopicDescriptions(table, session, topicClient);
853+
backup();
854+
855+
ExecuteDataDefinitionQuery(session, Sprintf(R"(
856+
DROP TABLE `%s`;
857+
)", table
858+
));
859+
860+
restore();
861+
auto changefeedsAndTopicsAfter = GetChangefeedAndTopicDescriptions(table, session, topicClient);
862+
863+
UNIT_ASSERT_EQUAL(changefeedsAndTopicsBefore, changefeedsAndTopicsAfter);
864+
}
865+
791866
void TestTopicSettingsArePreserved(
792867
const char* topic, NQuery::TSession& session, NTopic::TTopicClient& topicClient,
793868
TBackupFunction&& backup, TRestoreFunction&& restore
@@ -1525,6 +1600,27 @@ Y_UNIT_TEST_SUITE(BackupRestore) {
15251600
);
15261601
}
15271602

1603+
void TestChangefeedBackupRestore() {
1604+
TKikimrWithGrpcAndRootSchema server;
1605+
auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort())));
1606+
TTableClient tableClient(driver);
1607+
auto session = tableClient.GetSession().ExtractValueSync().GetSession();
1608+
NTopic::TTopicClient topicClient(driver);
1609+
TTempDir tempDir;
1610+
const auto& pathToBackup = tempDir.Path();
1611+
1612+
const auto table = "/Root/table";
1613+
1614+
TestChangefeedAndTopicDescriptionsIsPreserved(
1615+
table,
1616+
session,
1617+
topicClient,
1618+
CreateBackupLambda(driver, pathToBackup),
1619+
CreateRestoreLambda(driver, pathToBackup),
1620+
{"a", "b", "c"}
1621+
);
1622+
}
1623+
15281624
void TestReplicationBackupRestore() {
15291625
TKikimrWithGrpcAndRootSchema server;
15301626

@@ -1688,7 +1784,7 @@ Y_UNIT_TEST_SUITE(BackupRestore) {
16881784
case EPathTypeView:
16891785
return TestViewBackupRestore();
16901786
case EPathTypeCdcStream:
1691-
break; // https://github.com/ydb-platform/ydb/issues/7054
1787+
return TestChangefeedBackupRestore();
16921788
case EPathTypeReplication:
16931789
return TestReplicationBackupRestore();
16941790
case EPathTypeTransfer:
@@ -2160,6 +2256,23 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) {
21602256
);
21612257
}
21622258

2259+
void TestChangefeedBackupRestore() {
2260+
TS3TestEnv testEnv;
2261+
NTopic::TTopicClient topicClient(testEnv.GetDriver());
2262+
2263+
constexpr const char* table = "/Root/table";
2264+
testEnv.GetServer().GetRuntime()->GetAppData().FeatureFlags.SetEnableChangefeedsImport(true);
2265+
2266+
TestChangefeedAndTopicDescriptionsIsPreserved(
2267+
table,
2268+
testEnv.GetTableSession(),
2269+
topicClient,
2270+
CreateBackupLambda(testEnv.GetDriver(), testEnv.GetS3Port()),
2271+
CreateRestoreLambda(testEnv.GetDriver(), testEnv.GetS3Port(), { "table" }),
2272+
{"a", "b", "c"}
2273+
);
2274+
}
2275+
21632276
Y_UNIT_TEST_ALL_PROTO_ENUM_VALUES(TestAllSchemeObjectTypes, NKikimrSchemeOp::EPathType) {
21642277
using namespace NKikimrSchemeOp;
21652278

@@ -2184,7 +2297,8 @@ Y_UNIT_TEST_SUITE(BackupRestoreS3) {
21842297
TestViewBackupRestore();
21852298
break;
21862299
case EPathTypeCdcStream:
2187-
break; // https://github.com/ydb-platform/ydb/issues/7054
2300+
TestChangefeedBackupRestore();
2301+
break;
21882302
case EPathTypeReplication:
21892303
case EPathTypeTransfer:
21902304
break; // https://github.com/ydb-platform/ydb/issues/10436

0 commit comments

Comments
 (0)