|
3 | 3 | #include "util.h"
|
4 | 4 |
|
5 | 5 | #include <ydb-cpp-sdk/client/cms/cms.h>
|
| 6 | +#include <ydb-cpp-sdk/client/draft/ydb_replication.h> |
6 | 7 | #include <ydb-cpp-sdk/client/draft/ydb_view.h>
|
7 | 8 | #include <ydb-cpp-sdk/client/driver/driver.h>
|
8 | 9 | #include <ydb-cpp-sdk/client/proto/accessor.h>
|
9 | 10 | #include <ydb-cpp-sdk/client/result/result.h>
|
10 | 11 | #include <ydb-cpp-sdk/client/table/table.h>
|
11 | 12 | #include <ydb-cpp-sdk/client/topic/client.h>
|
12 | 13 | #include <ydb-cpp-sdk/client/value/value.h>
|
| 14 | +#include <ydb/public/api/protos/draft/ydb_replication.pb.h> |
13 | 15 | #include <ydb/public/api/protos/draft/ydb_view.pb.h>
|
14 | 16 | #include <ydb/public/api/protos/ydb_cms.pb.h>
|
15 | 17 | #include <ydb/public/api/protos/ydb_rate_limiter.pb.h>
|
|
52 | 54 |
|
53 | 55 | #include <google/protobuf/text_format.h>
|
54 | 56 |
|
| 57 | +#include <format> |
55 | 58 |
|
56 | 59 | namespace NYdb::NBackup {
|
57 | 60 |
|
@@ -683,6 +686,110 @@ void BackupCoordinationNode(TDriver driver, const TString& dbPath, const TFsPath
|
683 | 686 | BackupPermissions(driver, dbPath, fsBackupFolder);
|
684 | 687 | }
|
685 | 688 |
|
| 689 | +namespace { |
| 690 | + |
| 691 | +NReplication::TReplicationDescription DescribeReplication(TDriver driver, const TString& path) { |
| 692 | + NReplication::TReplicationClient client(driver); |
| 693 | + auto status = NConsoleClient::RetryFunction([&]() { |
| 694 | + return client.DescribeReplication(path).ExtractValueSync(); |
| 695 | + }); |
| 696 | + VerifyStatus(status, "describe async replication"); |
| 697 | + return status.GetReplicationDescription(); |
| 698 | +} |
| 699 | + |
| 700 | +TString BuildConnectionString(const NReplication::TConnectionParams& params) { |
| 701 | + return TStringBuilder() |
| 702 | + << (params.GetEnableSsl() ? "grpcs://" : "grpc://") |
| 703 | + << params.GetDiscoveryEndpoint() |
| 704 | + << "/?database=" << params.GetDatabase(); |
| 705 | +} |
| 706 | + |
| 707 | +inline TString BuildTarget(const char* src, const char* dst) { |
| 708 | + return TStringBuilder() << " `" << src << "` AS `" << dst << "`"; |
| 709 | +} |
| 710 | + |
| 711 | +inline TString Quote(const char* value) { |
| 712 | + return TStringBuilder() << "'" << value << "'"; |
| 713 | +} |
| 714 | + |
| 715 | +template <typename StringType> |
| 716 | +inline TString Quote(const StringType& value) { |
| 717 | + return Quote(value.c_str()); |
| 718 | +} |
| 719 | + |
| 720 | +inline TString BuildOption(const char* key, const TString& value) { |
| 721 | + return TStringBuilder() << " " << key << " = " << value << ""; |
| 722 | +} |
| 723 | + |
| 724 | +inline TString Interval(const TDuration& value) { |
| 725 | + return TStringBuilder() << "Interval('PT" << value.Seconds() << "S')"; |
| 726 | +} |
| 727 | + |
| 728 | +TString BuildCreateReplicationQuery( |
| 729 | + const TString& name, |
| 730 | + const TString& dbPath, |
| 731 | + const NReplication::TReplicationDescription& desc, |
| 732 | + const TString& backupRoot, |
| 733 | + NYql::TIssues& issues) |
| 734 | +{ |
| 735 | + // TODO(ilnaz) |
| 736 | + Y_UNUSED(dbPath); |
| 737 | + Y_UNUSED(backupRoot); |
| 738 | + Y_UNUSED(issues); |
| 739 | + |
| 740 | + TVector<TString> targets(::Reserve(desc.GetItems().size())); |
| 741 | + for (const auto& item : desc.GetItems()) { |
| 742 | + if (!item.DstPath.ends_with("/indexImplTable")) { // TODO(ilnaz): get rid of this hack |
| 743 | + targets.push_back(BuildTarget(item.SrcPath.c_str(), item.DstPath.c_str())); |
| 744 | + } |
| 745 | + } |
| 746 | + |
| 747 | + const auto& params = desc.GetConnectionParams(); |
| 748 | + |
| 749 | + TVector<TString> opts(::Reserve(5 /* max options */)); |
| 750 | + opts.push_back(BuildOption("CONNECTION_STRING", Quote(BuildConnectionString(params)))); |
| 751 | + switch (params.GetCredentials()) { |
| 752 | + case NReplication::TConnectionParams::ECredentials::Static: |
| 753 | + opts.push_back(BuildOption("USER", Quote(params.GetStaticCredentials().User))); |
| 754 | + opts.push_back(BuildOption("PASSWORD_SECRET_NAME", Quote(params.GetStaticCredentials().PasswordSecretName))); |
| 755 | + break; |
| 756 | + case NReplication::TConnectionParams::ECredentials::OAuth: |
| 757 | + opts.push_back(BuildOption("TOKEN_SECRET_NAME", Quote(params.GetOAuthCredentials().TokenSecretName))); |
| 758 | + break; |
| 759 | + } |
| 760 | + |
| 761 | + opts.push_back(BuildOption("CONSISTENCY_LEVEL", Quote(ToString(desc.GetConsistencyLevel())))); |
| 762 | + if (desc.GetConsistencyLevel() == NReplication::TReplicationDescription::EConsistencyLevel::Global) { |
| 763 | + opts.push_back(BuildOption("COMMIT_INTERVAL", Interval(desc.GetGlobalConsistency().GetCommitInterval()))); |
| 764 | + } |
| 765 | + |
| 766 | + return std::format("CREATE ASYNC REPLICATION `{}`\nFOR\n{}\nWITH (\n{}\n);", |
| 767 | + name.c_str(), JoinSeq(",\n", targets).c_str(), JoinSeq(",\n", opts).c_str()); |
| 768 | +} |
| 769 | + |
| 770 | +} |
| 771 | + |
| 772 | +void BackupReplication( |
| 773 | + TDriver driver, |
| 774 | + const TString& dbBackupRoot, |
| 775 | + const TString& dbPathRelativeToBackupRoot, |
| 776 | + const TFsPath& fsBackupFolder, |
| 777 | + NYql::TIssues& issues) |
| 778 | +{ |
| 779 | + Y_ENSURE(!dbPathRelativeToBackupRoot.empty()); |
| 780 | + const auto dbPath = JoinDatabasePath(dbBackupRoot, dbPathRelativeToBackupRoot); |
| 781 | + |
| 782 | + LOG_I("Backup async replication " << dbPath.Quote() << " to " << fsBackupFolder.GetPath().Quote()); |
| 783 | + |
| 784 | + const auto name = TFsPath(dbPathRelativeToBackupRoot).GetName(); |
| 785 | + const auto desc = DescribeReplication(driver, dbPath); |
| 786 | + const auto creationQuery = BuildCreateReplicationQuery(name, dbPath, desc, dbBackupRoot, issues); |
| 787 | + Y_ENSURE(creationQuery, issues.ToString()); |
| 788 | + |
| 789 | + WriteCreationQueryToFile(creationQuery, fsBackupFolder, NDump::NFiles::CreateAsyncReplication()); |
| 790 | + BackupPermissions(driver, dbPath, fsBackupFolder); |
| 791 | +} |
| 792 | + |
686 | 793 | void CreateClusterDirectory(const TDriver& driver, const TString& path, bool rootBackupDir = false) {
|
687 | 794 | if (rootBackupDir) {
|
688 | 795 | LOG_I("Create temporary directory " << path.Quote() << " in database");
|
@@ -776,6 +883,9 @@ void BackupFolderImpl(TDriver driver, const TString& dbPrefix, const TString& ba
|
776 | 883 | if (dbIt.IsCoordinationNode()) {
|
777 | 884 | BackupCoordinationNode(driver, dbIt.GetFullPath(), childFolderPath);
|
778 | 885 | }
|
| 886 | + if (dbIt.IsReplication()) { |
| 887 | + BackupReplication(driver, dbIt.GetTraverseRoot(), dbIt.GetRelPath(), childFolderPath, issues); |
| 888 | + } |
779 | 889 | dbIt.Next();
|
780 | 890 | }
|
781 | 891 | }
|
|
0 commit comments