diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index 212d49076d02..99a10e26ecad 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -9,7 +9,6 @@ namespace NKqp { TTestHelper::TTestHelper(const TKikimrSettings& settings) : Kikimr(settings) , TableClient(Kikimr.GetTableClient()) - , LongTxClient(Kikimr.GetDriver()) , Session(TableClient.CreateSession().GetValueSync().GetSession()) {} @@ -31,26 +30,6 @@ namespace NKqp { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } - void TTestHelper::InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit /*= {}*/, const EStatus opStatus /*= EStatus::SUCCESS*/) { - NLongTx::TLongTxBeginResult resBeginTx = LongTxClient.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); - auto batch = updates.BuildArrow(); - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - LongTxClient.Write(txId, table.GetName(), txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), opStatus, resWrite.Status().GetIssues().ToString()); - - if (onBeforeCommit) { - onBeforeCommit(); - } - - NLongTx::TLongTxCommitResult resCommitTx = LongTxClient.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); - } - void TTestHelper::BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) { Y_UNUSED(opStatus); NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer()); diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index 92b45eb60759..adc2ea16989a 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -3,7 +3,6 @@ #include "kqp_ut_common.h" #include #include -#include #include #include #include @@ -64,7 +63,6 @@ namespace NKqp { private: TKikimrRunner Kikimr; NYdb::NTable::TTableClient TableClient; - NYdb::NLongTx::TClient LongTxClient; NYdb::NTable::TSession Session; public: @@ -73,7 +71,6 @@ namespace NKqp { TTestActorRuntime& GetRuntime(); NYdb::NTable::TSession& GetSession(); void CreateTable(const TColumnTableBase& table); - void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function onBeforeCommit = {}, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS); void BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void BulkUpsert(const TColumnTable& table, std::shared_ptr batch, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS); void ReadData(const TString& query, const TString& expected, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp index 2d34fb23496a..f77b5a019f20 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp @@ -48,7 +48,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) { tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)).AddNull(); } - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } Sleep(TDuration::Seconds(1)); @@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) { for (size_t i = 0; i < inserted_rows; i++) { tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)).AddNull(); } - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } Sleep(TDuration::Seconds(1)); @@ -135,7 +135,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) { .Add("test_res_" + std::to_string(i + t * tables_in_store)) .AddNull(); } - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } Sleep(TDuration::Seconds(20)); @@ -155,4 +155,4 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) { } } // namespace NKqp -} // namespace NKikimr \ No newline at end of file +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index a91b40588cf0..4013a0429d16 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1,6 +1,5 @@ #include #include -#include #include #include @@ -534,67 +533,25 @@ Y_UNIT_TEST_SUITE(KqpOlap) { void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls = false) { UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead - TLocalHelper lHelper(kikimr); if (withSomeNulls) lHelper.WithSomeNulls(); - NYdb::NLongTx::TClient client(kikimr.GetDriver()); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount); - - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + lHelper.SendDataViaActorSystem(testTable, batch); } void WriteTestDataForClickBench(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { UNIT_ASSERT(testTable == "/Root/benchTable"); // TODO: check schema instead - TClickHelper lHelper(kikimr.GetTestServer()); - NYdb::NLongTx::TClient client(kikimr.GetDriver()); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount); - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + lHelper.SendDataViaActorSystem(testTable, batch); } void WriteTestDataForTableWithNulls(TKikimrRunner& kikimr, TString testTable) { UNIT_ASSERT(testTable == "/Root/tableWithNulls"); // TODO: check schema instead TTableWithNullsHelper lHelper(kikimr.GetTestServer()); - NYdb::NLongTx::TClient client(kikimr.GetDriver()); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); - - auto txId = resBeginTx.GetResult().tx_id(); auto batch = lHelper.TestArrowBatch(); - TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + lHelper.SendDataViaActorSystem(testTable, batch); } void CreateTableOfAllTypes(TKikimrRunner& kikimr) { @@ -5307,7 +5264,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { tableInserter.AddRow().Add(2).Add("test_res_2").Add("val1").AddNull(); tableInserter.AddRow().Add(3).Add("test_res_3").Add("val3").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add("val2").AddNull(); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } while (csController->GetIndexations().Val() == 0) { Cout << "Wait indexation..." << Endl; diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 249a8a205d49..3e0cab5eb053 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 536595ccf922..bef64ef18078 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -5378,7 +5377,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]"); @@ -5407,7 +5406,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(3).Add("test_res_3").Add(123).Add(200); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=3", "[[3;[123];[200u];[\"test_res_3\"]]]"); @@ -5444,7 +5443,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]"); } - +/* Y_UNIT_TEST(AddColumnOnSchemeChange) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5471,7 +5470,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]"); } - +*/ Y_UNIT_TEST(AddColumnWithStore) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5494,7 +5493,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]"); @@ -5524,7 +5523,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(3).Add("test_res_3").Add(123).Add(200); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[3;[123];[200u];[\"test_res_3\"]]]"); @@ -5580,7 +5579,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter, {}, EStatus::GENERIC_ERROR); + testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::GENERIC_ERROR); } { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull)); @@ -5611,7 +5610,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]"); { @@ -5628,7 +5627,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` ", "[[1];[2]]"); } - +/* Y_UNIT_TEST(DropColumnOnSchemeChange) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -5654,34 +5653,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]"); } - - Y_UNIT_TEST(DropColumnOldScheme) { - TKikimrSettings runnerSettings; - runnerSettings.WithSampleTables = false; - TTestHelper testHelper(runnerSettings); - - TVector schema = { - TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false), - TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), - TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) - }; - - TTestHelper::TColumnTable testTable; - - testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); - testHelper.CreateTable(testTable); - { - auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`DROP COLUMN resource_id;"; - auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); - } - { - TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); - tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); - testHelper.InsertData(testTable, tableInserter, {}, EStatus::SUCCESS); - } - // testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]"); - } +*/ Y_UNIT_TEST(DropColumnOldSchemeBulkUpsert) { TKikimrSettings runnerSettings; @@ -5729,7 +5701,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); - testHelper.InsertData(testTable, tableInserter); + testHelper.BulkUpsert(testTable, tableInserter); } testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]"); { diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index 557d0cfff98f..4853290f8637 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -429,15 +429,14 @@ std::shared_ptr TTableWithNullsHelper::TestArrowBatch(ui64, Y_ABORT_UNLESS(bJsonDoc.AppendNull().ok()); } - auto maybeJsonDoc = NBinaryJson::SerializeToBinaryJson(R"({"col1": "val1", "obj": {"obj_col2_int": 16}})"); - Y_ABORT_UNLESS(maybeJsonDoc.Defined()); + const auto maybeJsonDoc = std::string(R"({"col1": "val1", "obj": {"obj_col2_int": 16}})"); for (size_t i = rowCount / 2 + 1; i <= rowCount; ++i) { Y_ABORT_UNLESS(bId.Append(i).ok()); Y_ABORT_UNLESS(bResourceId.Append(std::to_string(i)).ok()); Y_ABORT_UNLESS(bLevel.AppendNull().ok()); Y_ABORT_UNLESS(bBinaryStr.Append(std::to_string(i)).ok()); Y_ABORT_UNLESS(bJsonVal.AppendNull().ok()); - Y_ABORT_UNLESS(bJsonDoc.Append(maybeJsonDoc->Data(), maybeJsonDoc->Size()).ok()); + Y_ABORT_UNLESS(bJsonDoc.Append(maybeJsonDoc.data(), maybeJsonDoc.length()).ok()); } std::shared_ptr aId;