From f02876b69fba48f714961938a2d7ee23181fce00 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Tue, 13 Feb 2024 20:25:29 +0300 Subject: [PATCH] Use uid as idempotency key KIKIMR-21059 merge from main: - 5c443ab2668e0bf954ed3d9d67cfa5ccdc6dc48e --- .../schemeshard_export__create.cpp | 19 ++++--- .../schemeshard_import__create.cpp | 19 ++++--- .../tx/schemeshard/ut_export/ut_export.cpp | 48 +++++++++++++++++ .../tx/schemeshard/ut_restore/ut_restore.cpp | 53 +++++++++++++++++++ 4 files changed, 127 insertions(+), 12 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 8c7fb7cd995e..cc630e8d2043 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -51,12 +51,19 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } const TString& uid = GetUid(request.GetRequest().GetOperationParams().labels()); - if (uid && Self->ExportsByUid.contains(uid)) { - return Reply( - std::move(response), - Ydb::StatusIds::ALREADY_EXISTS, - TStringBuilder() << "Export with uid '" << uid << "' already exists" - ); + if (uid) { + if (auto it = Self->ExportsByUid.find(uid); it != Self->ExportsByUid.end()) { + if (IsSameDomain(it->second, request.GetDatabaseName())) { + Self->FromXxportInfo(*response->Record.MutableResponse()->MutableEntry(), it->second); + return Reply(std::move(response)); + } else { + return Reply( + std::move(response), + Ydb::StatusIds::ALREADY_EXISTS, + TStringBuilder() << "Export with uid '" << uid << "' already exists" + ); + } + } } const TPath domainPath = TPath::Resolve(request.GetDatabaseName(), Self); diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index d20e9f9ab317..adcae1ce16bb 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -53,12 +53,19 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } const TString& uid = GetUid(request.GetRequest().GetOperationParams().labels()); - if (uid && Self->ImportsByUid.contains(uid)) { - return Reply( - std::move(response), - Ydb::StatusIds::ALREADY_EXISTS, - TStringBuilder() << "Import with uid '" << uid << "' already exists" - ); + if (uid) { + if (auto it = Self->ImportsByUid.find(uid); it != Self->ImportsByUid.end()) { + if (IsSameDomain(it->second, request.GetDatabaseName())) { + Self->FromXxportInfo(*response->Record.MutableResponse()->MutableEntry(), it->second); + return Reply(std::move(response)); + } else { + return Reply( + std::move(response), + Ydb::StatusIds::ALREADY_EXISTS, + TStringBuilder() << "Import with uid '" << uid << "' already exists" + ); + } + } } const TPath domainPath = TPath::Resolve(request.GetDatabaseName(), Self); diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 4a8ad8b32170..67b7c2c204ce 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -1394,4 +1394,52 @@ partitioning_settings { TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED); } + + Y_UNIT_TEST(UidAsIdempotencyKey) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + const auto request = Sprintf(R"( + OperationParams { + labels { + key: "uid" + value: "foo" + } + } + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", port); + + // create operation + TestExport(runtime, ++txId, "/MyRoot", request); + const ui64 exportId = txId; + // create operation again with same uid + TestExport(runtime, ++txId, "/MyRoot", request); + // new operation was not created + TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); + // check previous operation + TestGetExport(runtime, exportId, "/MyRoot"); + env.TestWaitNotification(runtime, exportId); + } } diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index 19b00dba18d3..4e8effd31bf9 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -2451,6 +2451,59 @@ Y_UNIT_TEST_SUITE(TImportTests) { Run(runtime, env, ConvertTestData(data), request, Ydb::StatusIds::PRECONDITION_FAILED); Run(runtime, env, ConvertTestData(data), request, Ydb::StatusIds::SUCCESS, "/MyRoot", false, userSID); } + + Y_UNIT_TEST(UidAsIdempotencyKey) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions()); + ui64 txId = 100; + + const auto data = GenerateTestData(R"( + columns { + name: "key" + type { optional_type { item { type_id: UTF8 } } } + } + columns { + name: "value" + type { optional_type { item { type_id: UTF8 } } } + } + primary_key: "key" + )", {{"a", 1}}); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + const auto request = Sprintf(R"( + OperationParams { + labels { + key: "uid" + value: "foo" + } + } + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "" + destination_path: "/MyRoot/Table" + } + } + )", port); + + // create operation + TestImport(runtime, ++txId, "/MyRoot", request); + const ui64 importId = txId; + // create operation again with same uid + TestImport(runtime, ++txId, "/MyRoot", request); + // new operation was not created + TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); + // check previous operation + TestGetImport(runtime, importId, "/MyRoot"); + env.TestWaitNotification(runtime, importId); + } + } Y_UNIT_TEST_SUITE(TImportWithRebootsTests) {