diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index eeea442e5498..6a10ccdbf721 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -2320,6 +2320,11 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer(input)) { + if (!SessionCtx->Config().FeatureFlags.GetEnableColumnStatistics()) { + ctx.AddError(TIssue("ANALYZE command is not supported because `EnableColumnStatistics` feature flag is off")); + return SyncError(); + } + auto cluster = TString(maybeAnalyze.Cast().DataSink().Cluster()); TAnalyzeSettings analyzeSettings = ParseAnalyzeSettings(maybeAnalyze.Cast()); diff --git a/ydb/core/kqp/ut/query/kqp_analyze_ut.cpp b/ydb/core/kqp/ut/query/kqp_analyze_ut.cpp index e4708c2676e1..9342f75de40e 100644 --- a/ydb/core/kqp/ut/query/kqp_analyze_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_analyze_ut.cpp @@ -19,7 +19,8 @@ Y_UNIT_TEST_SUITE(KqpAnalyze) { using namespace NStat; Y_UNIT_TEST_TWIN(AnalyzeTable, ColumnStore) { - TTestEnv env(1, 1, 1, true); + TTestEnv env(1, 1, true); + CreateDatabase(env, "Database"); TTableClient client(env.GetDriver()); diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index ef16ee226551..9becddab5a75 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -511,7 +511,11 @@ void TStatisticsAggregator::InitializeStatisticsTable() { if (!EnableColumnStatistics) { return; } - Register(CreateStatisticsTableCreator(std::make_unique())); + if (!Database) { + return; + } + Register(CreateStatisticsTableCreator( + std::make_unique(), Database)); } void TStatisticsAggregator::Navigate() { @@ -598,7 +602,7 @@ void TStatisticsAggregator::SaveStatisticsToTable() { data.push_back(strSketch); } - Register(CreateSaveStatisticsQuery(SelfId(), + Register(CreateSaveStatisticsQuery(SelfId(), Database, TraversalPathId, EStatType::COUNT_MIN_SKETCH, std::move(columnTags), std::move(data))); } @@ -610,7 +614,7 @@ void TStatisticsAggregator::DeleteStatisticsFromTable() { PendingDeleteStatistics = false; - Register(CreateDeleteStatisticsQuery(SelfId(), TraversalPathId)); + Register(CreateDeleteStatisticsQuery(SelfId(), Database, TraversalPathId)); } void TStatisticsAggregator::ScheduleNextAnalyze(NIceDb::TNiceDb& db) { diff --git a/ydb/core/statistics/aggregator/tx_configure.cpp b/ydb/core/statistics/aggregator/tx_configure.cpp index 391324b0b01a..4f0790b02e1b 100644 --- a/ydb/core/statistics/aggregator/tx_configure.cpp +++ b/ydb/core/statistics/aggregator/tx_configure.cpp @@ -22,8 +22,13 @@ struct TStatisticsAggregator::TTxConfigure : public TTxBase { NIceDb::TNiceDb db(txc.DB); + bool needInitialize = !Self->Database; Self->Database = Record.GetDatabase(); Self->PersistSysParam(db, Schema::SysParam_Database, Self->Database); + + if (needInitialize) { + Self->InitializeStatisticsTable(); + } return true; } diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp index ae8a8a355e00..fc53cf6cf761 100644 --- a/ydb/core/statistics/aggregator/tx_init.cpp +++ b/ydb/core/statistics/aggregator/tx_init.cpp @@ -287,7 +287,9 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { SA_LOG_W("[" << Self->TabletID() << "] TTxInit::Complete. EnableColumnStatistics=false"); } - Self->InitializeStatisticsTable(); + if (Self->Database) { + Self->InitializeStatisticsTable(); + } if (Self->TraversalPathId && Self->TraversalStartKey) { SA_LOG_D("[" << Self->TabletID() << "] TTxInit::Complete. Start navigate. PathId " << Self->TraversalPathId); diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp index d1c9f32ee43d..56d302a0effd 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp @@ -9,13 +9,12 @@ namespace NKikimr { namespace NStat { - - Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeTable) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; AnalyzeTable(runtime, tableInfo.ShardIds[0], tableInfo.PathId); } @@ -23,7 +22,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(Analyze) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId}); } @@ -31,7 +31,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeServerless) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateServerlessDatabaseColumnTables(env, 1, 1)[0]; + auto databaseInfo = CreateServerlessDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId}); } @@ -39,7 +40,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeAnalyzeOneColumnTableSpecificColumns) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; Analyze(runtime, tableInfo.SaTabletId, {{tableInfo.PathId, {1, 2}}}); } @@ -47,7 +49,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeTwoColumnTables) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfos = CreateDatabaseColumnTables(env, 2, 1); + auto databaseInfo = CreateDatabaseColumnTables(env, 2, 1); + const auto& tableInfos = databaseInfo.Tables; Analyze(runtime, tableInfos[0].SaTabletId, {tableInfos[0].PathId, tableInfos[1].PathId}); } @@ -58,11 +61,10 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { auto sender = runtime.AllocateEdgeActor(); TBlockEvents block(runtime); - - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; const TString operationId = "operationId"; - AnalyzeStatus(runtime, sender, tableInfo.SaTabletId, operationId, NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION); auto analyzeRequest = MakeAnalyzeRequest({{tableInfo.PathId, {1, 2}}}, operationId); @@ -93,7 +95,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeSameOperationId) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); const TString operationId = "operationId"; @@ -123,7 +126,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeMultiOperationId) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); auto GetOperationId = [] (size_t i) { return TStringBuilder() << "operationId" << i; }; @@ -154,7 +158,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeRebootSaBeforeAnalyzeTableResponse) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -179,7 +184,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeRebootSaBeforeResolve) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); TBlockEvents block(runtime); @@ -207,7 +213,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeRebootSaBeforeReqDistribution) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -232,7 +239,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeRebootSaBeforeAggregate) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -257,7 +265,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeRebootSaBeforeSave) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -282,7 +291,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeRebootSaInAggregate) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); int observerCount = 0; @@ -308,7 +318,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeRebootColumnShard) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); TBlockEvents block(runtime); @@ -326,7 +337,8 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeDeadline) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 1); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); TBlockEvents block(runtime); diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp index 4c1bf7a7d4e8..3d4c8e9fcde0 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp @@ -7,35 +7,22 @@ #include #include -#include - namespace NKikimr { namespace NStat { -namespace { - - -} // namespace - Y_UNIT_TEST_SUITE(AnalyzeDatashard) { Y_UNIT_TEST(AnalyzeOneTable) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table"); ui64 saTabletId; auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId); - runtime.SimulateSleep(TDuration::Seconds(30)); - Analyze(runtime, saTabletId, {{pathId}}); ValidateCountMinDatashardAbsense(runtime, pathId); @@ -43,20 +30,12 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { Y_UNIT_TEST(AnalyzeTwoTables) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table1"); - CreateUniformTable(env, "Database", "Table2"); - }; - // TODO remove thread - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - // TODO remove sleep - runtime.SimulateSleep(TDuration::Seconds(30)); + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table1"); + CreateUniformTable(env, "Database", "Table2"); ui64 saTabletId1; auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1", nullptr, &saTabletId1); @@ -68,34 +47,21 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { ValidateCountMinDatashardAbsense(runtime, pathId2); } - Y_UNIT_TEST(DropTableNavigateError) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table"); ui64 saTabletId = 0; auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId); - auto init2 = [&] () { - DropTable(env, "Database", "Table"); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); + DropTable(env, "Database", "Table"); Analyze(runtime, saTabletId, {pathId}); - runtime.SimulateSleep(TDuration::Seconds(10)); - ValidateCountMinDatashardAbsense(runtime, pathId); } } diff --git a/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp index efd8c8015f85..9d21363191c4 100644 --- a/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp @@ -34,7 +34,21 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTable) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; + + WaitForSavedStatistics(runtime, tableInfo.PathId); + + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + + UNIT_ASSERT(CheckCountMinSketch(countMin, ColumnTableRowsNumber)); + } + + Y_UNIT_TEST(TraverseServerlessColumnTable) { + TTestEnv env(1, 1); + auto& runtime = *env.GetServer().GetRuntime(); + auto databaseInfo = CreateServerlessDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; WaitForSavedStatistics(runtime, tableInfo.PathId); @@ -46,7 +60,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootColumnshard) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); WaitForSavedStatistics(runtime, tableInfo.PathId); @@ -61,7 +76,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeResolve) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); TBlockEvents block(runtime); @@ -86,7 +102,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeReqDistribution) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -106,7 +123,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeAggregate) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -126,7 +144,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootSaTabletBeforeSave) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); bool eventSeen = false; @@ -146,7 +165,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableRebootSaTabletInAggregate) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; auto sender = runtime.AllocateEdgeActor(); int observerCount = 0; @@ -167,7 +187,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableHiveDistributionZeroNodes) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; bool observerFirstExec = true; auto observer = runtime.AddObserver( @@ -212,7 +233,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableHiveDistributionAbsentNodes) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; bool observerFirstExec = true; auto observer = runtime.AddObserver( @@ -249,7 +271,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableAggrStatUnavailableNode) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; bool observerFirstExec = true; auto observer = runtime.AddObserver( @@ -286,7 +309,8 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { Y_UNIT_TEST(TraverseColumnTableAggrStatNonLocalTablet) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); - auto tableInfo = CreateDatabaseColumnTables(env, 1, 10)[0]; + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; bool observerFirstExec = true; auto observer = runtime.AddObserver( diff --git a/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp b/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp index 37bba99f4716..039334cf15e9 100644 --- a/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp @@ -7,31 +7,17 @@ #include #include -#include - namespace NKikimr { namespace NStat { -namespace { - - -} // namespace - Y_UNIT_TEST_SUITE(TraverseDatashard) { Y_UNIT_TEST(TraverseOneTable) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - runtime.SimulateSleep(TDuration::Seconds(60)); + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table"); auto pathId = ResolvePathId(runtime, "/Root/Database/Table"); ValidateCountMinDatashardAbsense(runtime, pathId); @@ -39,18 +25,11 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { Y_UNIT_TEST(TraverseTwoTables) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table1"); - CreateUniformTable(env, "Database", "Table2"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - runtime.SimulateSleep(TDuration::Seconds(60)); + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table1"); + CreateUniformTable(env, "Database", "Table2"); auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Database/Table2"); @@ -60,29 +39,11 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { Y_UNIT_TEST(TraverseOneTableServerless) { TTestEnv env(1, 1); - - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless", domainKey); - CreateUniformTable(env, "Serverless", "Table"); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); - runtime.SimulateSleep(TDuration::Seconds(60)); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Serverless", "/Root/Shared"); + CreateUniformTable(env, "Serverless", "Table"); auto pathId = ResolvePathId(runtime, "/Root/Serverless/Table"); ValidateCountMinDatashardAbsense(runtime, pathId); @@ -90,30 +51,12 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { Y_UNIT_TEST(TraverseTwoTablesServerless) { TTestEnv env(1, 1); - - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless", domainKey); - CreateUniformTable(env, "Serverless", "Table1"); - CreateUniformTable(env, "Serverless", "Table2"); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); - runtime.SimulateSleep(TDuration::Seconds(60)); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Serverless", "/Root/Shared"); + CreateUniformTable(env, "Serverless", "Table1"); + CreateUniformTable(env, "Serverless", "Table2"); auto pathId1 = ResolvePathId(runtime, "/Root/Serverless/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Serverless/Table2"); @@ -123,31 +66,13 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { Y_UNIT_TEST(TraverseTwoTablesTwoServerlessDbs) { TTestEnv env(1, 1); - - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless1", domainKey); - CreateServerlessDatabase(env, "Serverless2", domainKey); - CreateUniformTable(env, "Serverless1", "Table1"); - CreateUniformTable(env, "Serverless2", "Table2"); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); - runtime.SimulateSleep(TDuration::Seconds(60)); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Serverless1", "/Root/Shared"); + CreateServerlessDatabase(env, "Serverless2", "/Root/Shared"); + CreateUniformTable(env, "Serverless1", "Table1"); + CreateUniformTable(env, "Serverless2", "Table2"); auto pathId1 = ResolvePathId(runtime, "/Root/Serverless1/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Serverless2/Table2"); diff --git a/ydb/core/statistics/database/database.cpp b/ydb/core/statistics/database/database.cpp index 0a80147e24f1..41b67c5b0369 100644 --- a/ydb/core/statistics/database/database.cpp +++ b/ydb/core/statistics/database/database.cpp @@ -10,8 +10,9 @@ namespace NKikimr::NStat { class TStatisticsTableCreator : public TActorBootstrapped { public: - explicit TStatisticsTableCreator(std::unique_ptr resultEvent) + explicit TStatisticsTableCreator(std::unique_ptr resultEvent, const TString& database) : ResultEvent(std::move(resultEvent)) + , Database(database) {} void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override { @@ -38,6 +39,7 @@ class TStatisticsTableCreator : public TActorBootstrapped ResultEvent; + const TString Database; NActors::TActorId Owner; }; -NActors::IActor* CreateStatisticsTableCreator(std::unique_ptr event) { - return new TStatisticsTableCreator(std::move(event)); +NActors::IActor* CreateStatisticsTableCreator(std::unique_ptr event, const TString& database) { + return new TStatisticsTableCreator(std::move(event), database); } @@ -83,9 +86,9 @@ class TSaveStatisticsQuery : public NKikimr::TQueryBase { const std::vector Data; public: - TSaveStatisticsQuery(const TPathId& pathId, ui64 statType, + TSaveStatisticsQuery(const TString& database, const TPathId& pathId, ui64 statType, const std::vector& columnTags, const std::vector& data) - : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, {}, true) + : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, database, true) , PathId(pathId) , StatType(statType) , ColumnTags(columnTags) @@ -160,6 +163,7 @@ class TSaveStatisticsQuery : public NKikimr::TQueryBase { class TSaveStatisticsRetryingQuery : public TActorBootstrapped { private: const NActors::TActorId ReplyActorId; + const TString Database; const TPathId PathId; const ui64 StatType; const std::vector ColumnTags; @@ -168,11 +172,12 @@ class TSaveStatisticsRetryingQuery : public TActorBootstrapped&, const std::vector&>; + const TString&, const TPathId&, ui64, const std::vector&, const std::vector&>; - TSaveStatisticsRetryingQuery(const NActors::TActorId& replyActorId, + TSaveStatisticsRetryingQuery(const NActors::TActorId& replyActorId, const TString& database, const TPathId& pathId, ui64 statType, std::vector&& columnTags, std::vector&& data) : ReplyActorId(replyActorId) + , Database(database) , PathId(pathId) , StatType(statType) , ColumnTags(std::move(columnTags)) @@ -186,7 +191,7 @@ class TSaveStatisticsRetryingQuery : public TActorBootstrapped::max(), TDuration::Seconds(1)), - PathId, StatType, ColumnTags, Data + Database, PathId, StatType, ColumnTags, Data )); Become(&TSaveStatisticsRetryingQuery::StateFunc); } @@ -201,10 +206,10 @@ class TSaveStatisticsRetryingQuery : public TActorBootstrapped&& columnTags, std::vector&& data) { - return new TSaveStatisticsRetryingQuery(replyActorId, pathId, statType, std::move(columnTags), std::move(data)); + return new TSaveStatisticsRetryingQuery(replyActorId, database, pathId, statType, std::move(columnTags), std::move(data)); } @@ -218,8 +223,8 @@ class TLoadStatisticsQuery : public NKikimr::TQueryBase { std::optional Data; public: - TLoadStatisticsQuery(const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie) - : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, {}, true) + TLoadStatisticsQuery(const TString& database, const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie) + : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, database, true) , PathId(pathId) , StatType(statType) , ColumnTag(columnTag) @@ -293,6 +298,7 @@ class TLoadStatisticsQuery : public NKikimr::TQueryBase { class TLoadStatisticsRetryingQuery : public TActorBootstrapped { private: const NActors::TActorId ReplyActorId; + const TString Database; const TPathId PathId; const ui64 StatType; const ui32 ColumnTag; @@ -301,11 +307,12 @@ class TLoadStatisticsRetryingQuery : public TActorBootstrapped; + const TString&, const TPathId&, ui64, ui32, ui64>; - TLoadStatisticsRetryingQuery(const NActors::TActorId& replyActorId, + TLoadStatisticsRetryingQuery(const NActors::TActorId& replyActorId, const TString& database, const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie) : ReplyActorId(replyActorId) + , Database(database) , PathId(pathId) , StatType(statType) , ColumnTag(columnTag) @@ -319,7 +326,7 @@ class TLoadStatisticsRetryingQuery : public TActorBootstrapped::max(), TDuration::Seconds(1)), - PathId, StatType, ColumnTag, Cookie + Database, PathId, StatType, ColumnTag, Cookie )); Become(&TLoadStatisticsRetryingQuery::StateFunc); } @@ -335,9 +342,9 @@ class TLoadStatisticsRetryingQuery : public TActorBootstrapped { private: const NActors::TActorId ReplyActorId; + const TString Database; const TPathId PathId; public: using TDeleteRetryingQuery = TQueryRetryActor< TDeleteStatisticsQuery, TEvStatistics::TEvDeleteStatisticsQueryResponse, - const TPathId&>; + const TString&, const TPathId&>; - TDeleteStatisticsRetryingQuery(const NActors::TActorId& replyActorId, const TPathId& pathId) + TDeleteStatisticsRetryingQuery(const NActors::TActorId& replyActorId, const TString& database, + const TPathId& pathId) : ReplyActorId(replyActorId) + , Database(database) , PathId(pathId) {} @@ -411,7 +421,7 @@ class TDeleteStatisticsRetryingQuery : public TActorBootstrapped::max(), TDuration::Seconds(1)), - PathId + Database, PathId )); Become(&TDeleteStatisticsRetryingQuery::StateFunc); } @@ -426,9 +436,10 @@ class TDeleteStatisticsRetryingQuery : public TActorBootstrapped event); +NActors::IActor* CreateStatisticsTableCreator(std::unique_ptr event, const TString& database); -NActors::IActor* CreateSaveStatisticsQuery(const NActors::TActorId& replyActorId, +NActors::IActor* CreateSaveStatisticsQuery(const NActors::TActorId& replyActorId, const TString& database, const TPathId& pathId, ui64 statType, std::vector&& columnTags, std::vector&& data); -NActors::IActor* CreateLoadStatisticsQuery(const NActors::TActorId& replyActorId, +NActors::IActor* CreateLoadStatisticsQuery(const NActors::TActorId& replyActorId, const TString& database, const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie); -NActors::IActor* CreateDeleteStatisticsQuery(const NActors::TActorId& replyActorId, const TPathId& pathId); +NActors::IActor* CreateDeleteStatisticsQuery(const NActors::TActorId& replyActorId, const TString& database, + const TPathId& pathId); }; diff --git a/ydb/core/statistics/database/ut/ut_database.cpp b/ydb/core/statistics/database/ut/ut_database.cpp index ff4c47a862ce..188f9e1c1e97 100644 --- a/ydb/core/statistics/database/ut/ut_database.cpp +++ b/ydb/core/statistics/database/ut/ut_database.cpp @@ -12,41 +12,37 @@ namespace NKikimr::NStat { Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { Y_UNIT_TEST(Simple) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); auto sender = runtime.AllocateEdgeActor(0); - runtime.Register(CreateStatisticsTableCreator(std::make_unique()), + runtime.Register(CreateStatisticsTableCreator( + std::make_unique(), "/Root/Database"), 0, 0, TMailboxType::Simple, 0, sender); - runtime.GrabEdgeEvent(sender); + runtime.GrabEdgeEventRethrow(sender); TPathId pathId(1, 1); ui64 statType = 1; std::vector columnTags = {1, 2}; std::vector data = {"dataA", "dataB"}; - runtime.Register(CreateSaveStatisticsQuery(sender, + runtime.Register(CreateSaveStatisticsQuery(sender, "/Root/Database", pathId, statType, std::move(columnTags), std::move(data)), 0, 0, TMailboxType::Simple, 0, sender); - auto saveResponse = runtime.GrabEdgeEvent(sender); + auto saveResponse = runtime.GrabEdgeEventRethrow(sender); UNIT_ASSERT(saveResponse->Get()->Success); - runtime.Register(CreateLoadStatisticsQuery(sender, pathId, statType, 1, 1), + runtime.Register(CreateLoadStatisticsQuery(sender, "/Root/Database", pathId, statType, 1, 1), 0, 0, TMailboxType::Simple, 0, sender); - auto loadResponseA = runtime.GrabEdgeEvent(sender); + auto loadResponseA = runtime.GrabEdgeEventRethrow(sender); UNIT_ASSERT(loadResponseA->Get()->Success); UNIT_ASSERT(loadResponseA->Get()->Data); UNIT_ASSERT_VALUES_EQUAL(*loadResponseA->Get()->Data, "dataA"); - runtime.Register(CreateLoadStatisticsQuery(sender, pathId, statType, 2, 1), + runtime.Register(CreateLoadStatisticsQuery(sender, "/Root/Database", pathId, statType, 2, 1), 0, 0, TMailboxType::Simple, 0, sender); - auto loadResponseB = runtime.GrabEdgeEvent(sender); + auto loadResponseB = runtime.GrabEdgeEventRethrow(sender); UNIT_ASSERT(loadResponseB->Get()->Success); UNIT_ASSERT(loadResponseB->Get()->Data); UNIT_ASSERT_VALUES_EQUAL(*loadResponseB->Get()->Data, "dataB"); @@ -54,17 +50,13 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { Y_UNIT_TEST(Delete) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); auto sender = runtime.AllocateEdgeActor(0); - runtime.Register(CreateStatisticsTableCreator(std::make_unique()), + runtime.Register(CreateStatisticsTableCreator( + std::make_unique(), "/Root/Database"), 0, 0, TMailboxType::Simple, 0, sender); runtime.GrabEdgeEvent(sender); @@ -73,18 +65,18 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { std::vector columnTags = {1, 2}; std::vector data = {"dataA", "dataB"}; - runtime.Register(CreateSaveStatisticsQuery(sender, + runtime.Register(CreateSaveStatisticsQuery(sender, "/Root/Database", pathId, statType, std::move(columnTags), std::move(data)), 0, 0, TMailboxType::Simple, 0, sender); auto saveResponse = runtime.GrabEdgeEvent(sender); UNIT_ASSERT(saveResponse->Get()->Success); - runtime.Register(CreateDeleteStatisticsQuery(sender, pathId), + runtime.Register(CreateDeleteStatisticsQuery(sender, "/Root/Database", pathId), 0, 0, TMailboxType::Simple, 0, sender); auto deleteResponse = runtime.GrabEdgeEvent(sender); UNIT_ASSERT(deleteResponse->Get()->Success); - runtime.Register(CreateLoadStatisticsQuery(sender, pathId, statType, 1, 1), + runtime.Register(CreateLoadStatisticsQuery(sender, "/Root/Database", pathId, statType, 1, 1), 0, 0, TMailboxType::Simple, 0, sender); auto loadResponseA = runtime.GrabEdgeEvent(sender); UNIT_ASSERT(!loadResponseA->Get()->Success); @@ -92,15 +84,10 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { Y_UNIT_TEST(ForbidAccess) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(10)); - initThread.join(); + + CreateDatabase(env, "Database", 1, true); + CreateUniformTable(env, "Database", "Table"); NYdb::EStatus status; auto test = [&] () { @@ -118,7 +105,7 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { }; std::thread testThread(test); - runtime.SimulateSleep(TDuration::Seconds(10)); + runtime.SimulateSleep(TDuration::Seconds(1)); testThread.join(); UNIT_ASSERT_VALUES_EQUAL(status, NYdb::EStatus::SCHEME_ERROR); diff --git a/ydb/core/statistics/service/http_request.cpp b/ydb/core/statistics/service/http_request.cpp index 1c00bf0e39b7..298bcd991162 100644 --- a/ydb/core/statistics/service/http_request.cpp +++ b/ydb/core/statistics/service/http_request.cpp @@ -1,34 +1,32 @@ #include "http_request.h" - #include +#include +#include #include #include #include -#include + namespace NKikimr { namespace NStat { -TString MakeOperationId() { - TULIDGenerator ulidGen; - return ulidGen.Next(TActivationContext::Now()).ToBinary(); -} +static constexpr ui64 FirstRoundCookie = 1; +static constexpr ui64 SecondRoundCookie = 2; -THttpRequest::THttpRequest(EType type, const TString& path, TActorId replyToActorId) - : Type(type) - , Path(path) +THttpRequest::THttpRequest(ERequestType requestType, const std::unordered_map& params, const TActorId& replyToActorId) + : RequestType(requestType) + , Params(params) , ReplyToActorId(replyToActorId) - , OperationId(MakeOperationId() ) -{} +{} void THttpRequest::Bootstrap() { - using TNavigate = NSchemeCache::TSchemeCacheNavigate; auto navigate = std::make_unique(); auto& entry = navigate->ResultSet.emplace_back(); - entry.Path = SplitPath(Path); + entry.Path = SplitPath(Params[EParamType::PATH]); entry.Operation = TNavigate::EOp::OpTable; entry.RequestType = TNavigate::TEntry::ERequestType::ByPath; + entry.ShowPrivatePath = true; navigate->Cookie = FirstRoundCookie; Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); @@ -37,20 +35,18 @@ void THttpRequest::Bootstrap() { } void THttpRequest::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - using TNavigate = NSchemeCache::TSchemeCacheNavigate; std::unique_ptr navigate(ev->Get()->Request.Release()); Y_ABORT_UNLESS(navigate->ResultSet.size() == 1); - auto& entry = navigate->ResultSet.front(); + + const auto& entry = navigate->ResultSet.front(); if (navigate->Cookie == SecondRoundCookie) { if (entry.Status != TNavigate::EStatus::Ok) { HttpReply("Internal error"); return; } - if (entry.DomainInfo->Params.HasStatisticsAggregator()) { - StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator(); - } - ResolveSuccess(); + + DoRequest(entry); return; } @@ -71,19 +67,12 @@ void THttpRequest::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& } } - PathId = entry.TableId.PathId; - - auto& domainInfo = entry.DomainInfo; - ui64 aggregatorId = 0; - if (domainInfo->Params.HasStatisticsAggregator()) { - aggregatorId = domainInfo->Params.GetStatisticsAggregator(); + if (RequestType == ERequestType::COUNT_MIN_SKETCH_PROBE) { + DoRequest(entry); + return; } - bool isServerless = domainInfo->IsServerless(); - TPathId domainKey = domainInfo->DomainKey; - TPathId resourcesDomainKey = domainInfo->ResourcesDomainKey; auto navigateDomainKey = [this] (TPathId domainKey) { - using TNavigate = NSchemeCache::TSchemeCacheNavigate; auto navigate = std::make_unique(); auto& entry = navigate->ResultSet.emplace_back(); entry.TableId = TTableId(domainKey.OwnerId, domainKey.LocalPathId); @@ -95,27 +84,23 @@ void THttpRequest::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); }; - if (!isServerless) { - if (aggregatorId) { - StatisticsAggregatorId = aggregatorId; - ResolveSuccess(); - } else { - navigateDomainKey(domainKey); - } - } else { - navigateDomainKey(resourcesDomainKey); + const auto& domainInfo = entry.DomainInfo; + + if (domainInfo->IsServerless()) { + navigateDomainKey(domainInfo->ResourcesDomainKey); + return; + } + + if (!domainInfo->Params.HasStatisticsAggregator()) { + navigateDomainKey(domainInfo->DomainKey); + return; } + + DoRequest(entry); } void THttpRequest::Handle(TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev) { - auto& record = ev->Get()->Record; - - if (record.GetOperationId() != OperationId) { - ALOG_ERROR(NKikimrServices::STATISTICS, - "THttpRequest, TEvAnalyzeStatusResponse has operationId=" << record.GetOperationId() - << " , but expected " << OperationId); - HttpReply("Wrong OperationId"); - } + const auto& record = ev->Get()->Record; switch (record.GetStatus()) { case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_UNSPECIFIED: @@ -133,34 +118,114 @@ void THttpRequest::Handle(TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev) { } } +void THttpRequest::Handle(TEvStatistics::TEvLoadStatisticsQueryResponse::TPtr& ev) { + const auto msg = ev->Get(); + if (!msg->Success || !msg->Data) { + const auto status = std::to_string(static_cast(msg->Status)); + HttpReply("Error occurred while loading statistics. Status: " + status); + return; + } + + const auto typeId = static_cast(msg->Cookie); + const NScheme::TTypeInfo typeInfo(typeId); + const TStringBuf value(Params[EParamType::CELL_VALUE]); + TMemoryPool pool(64); + + TCell cell; + TString error; + if (!NFormats::MakeCell(cell, value, typeInfo, pool, error)) { + HttpReply("Cell value parsing error: " + error); + return; + } + + auto countMinSketch = std::unique_ptr(TCountMinSketch::FromString(msg->Data->Data(), msg->Data->Size())); + const auto probe = countMinSketch->Probe(cell.Data(), cell.Size()); + HttpReply(Params[EParamType::PATH] + "[" + Params[EParamType::COLUMN_NAME] + "]=" + std::to_string(probe)); +} + void THttpRequest::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) { HttpReply("Delivery problem"); } -void THttpRequest::ResolveSuccess() { - if (StatisticsAggregatorId == 0) { +void THttpRequest::DoRequest(const TNavigate::TEntry& entry) { + switch (RequestType) { + case ERequestType::ANALYZE: + DoAnalyze(entry); + return; + case ERequestType::STATUS: + DoStatus(entry); + return; + case ERequestType::COUNT_MIN_SKETCH_PROBE: + DoCountMinSketchProbe(entry); + return; + } +} + +void THttpRequest::DoAnalyze(const TNavigate::TEntry& entry) { + if (!entry.DomainInfo->Params.HasStatisticsAggregator()) { HttpReply("No statistics aggregator"); return; } - if (Type == ANALYZE) { - auto analyze = std::make_unique(); - auto& record = analyze->Record; - record.SetOperationId(OperationId); - PathIdFromPathId(PathId, record.AddTables()->MutablePathId()); + const auto statisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator(); + const auto operationId = TULIDGenerator().Next(TActivationContext::Now()); + + auto analyze = std::make_unique(); + auto& record = analyze->Record; + record.SetOperationId(operationId.ToBinary()); + + const auto& pathId = entry.TableId.PathId; + PathIdFromPathId(pathId, record.AddTables()->MutablePathId()); + + Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(analyze.release(), statisticsAggregatorId, true)); + HttpReply("Analyze sent. OperationId: " + operationId.ToString()); +} - Send(MakePipePerNodeCacheID(false), - new TEvPipeCache::TEvForward(analyze.release(), StatisticsAggregatorId, true)); +void THttpRequest::DoStatus(const TNavigate::TEntry& entry) { + if (!entry.DomainInfo->Params.HasStatisticsAggregator()) { + HttpReply("No statistics aggregator"); + return; + } - HttpReply("Analyze sent"); - } else { - auto getStatus = std::make_unique(); - auto& record = getStatus->Record; - record.SetOperationId(OperationId); + const auto statisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator(); - Send(MakePipePerNodeCacheID(false), - new TEvPipeCache::TEvForward(getStatus.release(), StatisticsAggregatorId, true)); + const auto& operationIdParam = Params[EParamType::OPERATION_ID]; + TULID operationId; + + if (operationIdParam.empty() || !operationId.ParseString(operationIdParam)) { + HttpReply(TString("Wrong OperationId: ") + (operationIdParam.empty() ? "Empty" : operationIdParam)); } + + auto status = std::make_unique(); + auto& record = status->Record; + record.SetOperationId(operationId.ToBinary()); + + Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(status.release(), statisticsAggregatorId, true)); +} + +void THttpRequest::DoCountMinSketchProbe(const TNavigate::TEntry& entry) { + const auto& columnName = Params[EParamType::COLUMN_NAME]; + if (columnName.empty()) { + HttpReply("Column is not set"); + return; + } + + if (Params[EParamType::CELL_VALUE].empty()) { + HttpReply("Value is not set"); + return; + } + + for (const auto& [_, tableInfo]: entry.Columns) { + if (tableInfo.Name == columnName) { + const auto columnTag = tableInfo.Id; + const auto typeId = tableInfo.PType.GetTypeId(); + const auto& pathId = entry.TableId.PathId; + Register(CreateLoadStatisticsQuery(SelfId(), Params[EParamType::DATABASE], pathId, EStatType::COUNT_MIN_SKETCH, columnTag, typeId)); + return; + } + } + + HttpReply("Column not found"); } void THttpRequest::HttpReply(const TString& msg) { @@ -174,6 +239,5 @@ void THttpRequest::PassAway() { } - } // NStat -} // NKikimr \ No newline at end of file +} // NKikimr diff --git a/ydb/core/statistics/service/http_request.h b/ydb/core/statistics/service/http_request.h index 24f6bddfb440..e3e514281225 100644 --- a/ydb/core/statistics/service/http_request.h +++ b/ydb/core/statistics/service/http_request.h @@ -7,6 +7,8 @@ #include #include +#include + namespace NKikimr { namespace NStat { @@ -20,19 +22,31 @@ class THttpRequest : public NActors::TActorBootstrapped { void Bootstrap(); - enum EType { + enum class ERequestType { ANALYZE, - STATUS + STATUS, + COUNT_MIN_SKETCH_PROBE + }; + + enum class EParamType { + DATABASE, + PATH, + OPERATION_ID, + COLUMN_NAME, + CELL_VALUE }; - THttpRequest(EType type, const TString& path, TActorId replyToActorId); + THttpRequest(ERequestType requestType, const std::unordered_map& params, const TActorId& replyToActorId); private: + using TNavigate = NSchemeCache::TSchemeCacheNavigate; + STFUNC(StateWork) { switch(ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); hFunc(TEvStatistics::TEvAnalyzeStatusResponse, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + hFunc(TEvStatistics::TEvLoadStatisticsQueryResponse, Handle); IgnoreFunc(TEvStatistics::TEvAnalyzeResponse); default: LOG_CRIT_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, @@ -42,24 +56,22 @@ class THttpRequest : public NActors::TActorBootstrapped { void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); void Handle(TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev); - void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&); + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev); + void Handle(TEvStatistics::TEvLoadStatisticsQueryResponse::TPtr& ev); + + void DoRequest(const TNavigate::TEntry& entry); + void DoAnalyze(const TNavigate::TEntry& entry); + void DoStatus(const TNavigate::TEntry& entry); + void DoCountMinSketchProbe(const TNavigate::TEntry& entry); - void ResolveSuccess(); void HttpReply(const TString& msg); void PassAway(); private: - const EType Type; - const TString Path; + const ERequestType RequestType; + std::unordered_map Params; const TActorId ReplyToActorId; - - TPathId PathId; - TString OperationId; - ui64 StatisticsAggregatorId = 0; - - static const ui64 FirstRoundCookie = 1; - static const ui64 SecondRoundCookie = 2; }; } // NStat diff --git a/ydb/core/statistics/service/service_impl.cpp b/ydb/core/statistics/service/service_impl.cpp index e8116833033b..7743b2d51f1b 100644 --- a/ydb/core/statistics/service/service_impl.cpp +++ b/ydb/core/statistics/service/service_impl.cpp @@ -1,6 +1,7 @@ #include "service.h" #include "http_request.h" +#include #include #include @@ -22,6 +23,7 @@ #include + namespace NKikimr { namespace NStat { @@ -102,8 +104,7 @@ struct TAggregationStatistics { ? &Nodes[i] : nullptr; } } - LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Child node with the specified id was not found"); + SA_LOG_E("Child node with the specified id was not found"); return nullptr; } }; @@ -207,17 +208,16 @@ class TStatService : public TActorBootstrapped { hFunc(TEvStatistics::TEvAggregateStatisticsResponse, Handle); hFunc(NMon::TEvHttpInfo, Handle); + hFunc(NMon::TEvHttpInfoRes, Handle); cFunc(TEvents::TEvPoison::EventType, PassAway); default: - LOG_CRIT_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "NStat::TStatService: unexpected event# " << ev->GetTypeRewrite() << " " << ev->ToString()); + SA_LOG_CRIT("NStat::TStatService: unexpected event# " << ev->GetTypeRewrite() << " " << ev->ToString()); } } private: void HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) { - LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Subscribed for config changes on node " << SelfId().NodeId()); + SA_LOG_I("Subscribed for config changes on node " << SelfId().NodeId()); } void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { @@ -237,8 +237,7 @@ class TStatService : public TActorBootstrapped { bool IsNotCurrentRound(ui64 round) { if (round != AggregationStatistics.Round) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Event round " << round << " is different from the current " << AggregationStatistics.Round); + SA_LOG_D("Event round " << round << " is different from the current " << AggregationStatistics.Round); return true; } return false; @@ -303,8 +302,7 @@ class TStatService : public TActorBootstrapped { const auto& record = ev->Get()->Record; const auto tabletId = record.GetShardTabletId(); - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Received TEvStatisticsResponse TabletId: " << tabletId); + SA_LOG_D("Received TEvStatisticsResponse TabletId: " << tabletId); const auto round = ev->Cookie; if (IsNotCurrentRound(round)) { @@ -332,8 +330,7 @@ class TStatService : public TActorBootstrapped { const auto round = record.GetRound(); if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvAggregateKeepAliveAck"); + SA_LOG_D("Skip TEvAggregateKeepAliveAck"); return; } @@ -343,8 +340,7 @@ class TStatService : public TActorBootstrapped { void Handle(TEvPrivate::TEvKeepAliveAckTimeout::TPtr& ev) { const auto round = ev->Get()->Round; if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvKeepAliveAckTimeout"); + SA_LOG_D("Skip TEvKeepAliveAckTimeout"); return; } @@ -359,8 +355,7 @@ class TStatService : public TActorBootstrapped { // the parent node is unavailable // invalidate the subtree with the root in the current node - LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Parent node " << AggregationStatistics.ParentNode.NodeId() << " is unavailable"); + SA_LOG_I("Parent node " << AggregationStatistics.ParentNode.NodeId() << " is unavailable"); ResetAggregationStatistics(); @@ -369,8 +364,7 @@ class TStatService : public TActorBootstrapped { void Handle(TEvPrivate::TEvDispatchKeepAlive::TPtr& ev) { const auto round = ev->Get()->Round; if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvDispatchKeepAlive"); + SA_LOG_D("Skip TEvDispatchKeepAlive"); return; } @@ -384,8 +378,7 @@ class TStatService : public TActorBootstrapped { const auto round = ev->Get()->Round; if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvKeepAliveTimeout"); + SA_LOG_D("Skip TEvKeepAliveTimeout"); return; } @@ -393,8 +386,7 @@ class TStatService : public TActorBootstrapped { auto node = AggregationStatistics.GetProcessingChildNode(nodeId); if (node == nullptr) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvKeepAliveTimeout"); + SA_LOG_D("Skip TEvKeepAliveTimeout"); return; } @@ -409,8 +401,7 @@ class TStatService : public TActorBootstrapped { node->Status = TAggregationStatistics::TNode::EStatus::Unavailable; ++AggregationStatistics.PprocessedNodes; - LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Node " << nodeId << " is unavailable"); + SA_LOG_I("Node " << nodeId << " is unavailable"); if (AggregationStatistics.IsCompleted()) { OnAggregateStatisticsFinished(); @@ -422,8 +413,7 @@ class TStatService : public TActorBootstrapped { const auto round = record.GetRound(); if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvAggregateKeepAlive"); + SA_LOG_D("Skip TEvAggregateKeepAlive"); return; } @@ -431,8 +421,7 @@ class TStatService : public TActorBootstrapped { auto node = AggregationStatistics.GetProcessingChildNode(nodeId); if (node == nullptr) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvAggregateKeepAlive"); + SA_LOG_D( "Skip TEvAggregateKeepAlive"); return; } @@ -444,15 +433,13 @@ class TStatService : public TActorBootstrapped { } void Handle(TEvStatistics::TEvAggregateStatisticsResponse::TPtr& ev) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Received TEvAggregateStatisticsResponse SenderNodeId: " << ev->Sender.NodeId()); + SA_LOG_D("Received TEvAggregateStatisticsResponse SenderNodeId: " << ev->Sender.NodeId()); const auto& record = ev->Get()->Record; const auto round = record.GetRound(); if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvAggregateStatisticsResponse"); + SA_LOG_D("Skip TEvAggregateStatisticsResponse"); return; } @@ -460,8 +447,7 @@ class TStatService : public TActorBootstrapped { auto node = AggregationStatistics.GetProcessingChildNode(nodeId); if (node == nullptr) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvAggregateStatisticsResponse"); + SA_LOG_D("Skip TEvAggregateStatisticsResponse"); return; } @@ -501,8 +487,7 @@ class TStatService : public TActorBootstrapped { } void SendAggregateStatisticsResponse() { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Send aggregate statistics response to node: " << AggregationStatistics.ParentNode.NodeId()); + SA_LOG_D("Send aggregate statistics response to node: " << AggregationStatistics.ParentNode.NodeId()); auto response = std::make_unique(); auto& record = response->Record; @@ -587,8 +572,7 @@ class TStatService : public TActorBootstrapped { const auto& record = ev->Get()->Record; const auto round = record.GetRound(); - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Received TEvAggregateStatistics from node: " << ev->Sender.NodeId() + SA_LOG_D("Received TEvAggregateStatistics from node: " << ev->Sender.NodeId() << ", Round: " << round << ", current Round: " << AggregationStatistics.Round); // reset previous state @@ -662,13 +646,12 @@ class TStatService : public TActorBootstrapped { request.StatType = ev->Get()->StatType; request.StatRequests.swap(ev->Get()->StatRequests); - if (!EnableStatistics) { + if (!EnableStatistics || IsStatisticsDisabledInSA) { ReplyFailed(requestId, true); return; } - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Handle TEvStatistics::TEvGetStatistics, request id = " << requestId + SA_LOG_D("Handle TEvStatistics::TEvGetStatistics, request id = " << requestId << ", ReplyToActorId = " << request.ReplyToActorId << ", StatRequests.size() = " << request.StatRequests.size()); @@ -685,7 +668,7 @@ class TStatService : public TActorBootstrapped { } ui64 loadCookie = NextLoadQueryCookie++; LoadQueriesInFlight[loadCookie] = std::make_pair(requestId, reqIndex); - Register(CreateLoadStatisticsQuery(SelfId(), + Register(CreateLoadStatisticsQuery(SelfId(), "", req.PathId, request.StatType, *req.ColumnTag, loadCookie)); ++request.ReplyCounter; ++reqIndex; @@ -712,8 +695,7 @@ class TStatService : public TActorBootstrapped { auto cookie = navigate->Cookie; - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, request id = " << cookie); + SA_LOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, request id = " << cookie); if (cookie == ResolveSACookie) { Y_ABORT_UNLESS(navigate->ResultSet.size() == 1); @@ -729,7 +711,14 @@ class TStatService : public TActorBootstrapped { ConnectToSA(); SyncNode(); } else { - ReplyAllFailed(); + for (auto it = InFlight.begin(); it != InFlight.end();) { + if (EStatType::COUNT_MIN_SKETCH == it->second.StatType) { + ++it; + continue; + } + ReplyFailed(it->first, false); + it = InFlight.erase(it); + } } return; } @@ -835,11 +824,12 @@ class TStatService : public TActorBootstrapped { } void Handle(TEvStatistics::TEvPropagateStatistics::TPtr& ev) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "EvPropagateStatistics, node id = " << SelfId().NodeId()); + SA_LOG_D("EvPropagateStatistics, node id = " << SelfId().NodeId()); Send(ev->Sender, new TEvStatistics::TEvPropagateStatisticsResponse); + IsStatisticsDisabledInSA = false; + auto* record = ev->Get()->MutableRecord(); for (const auto& entry : record->GetEntries()) { ui64 schemeShardId = entry.GetSchemeShardId(); @@ -918,21 +908,18 @@ class TStatService : public TActorBootstrapped { void Handle(TEvPrivate::TEvStatisticsRequestTimeout::TPtr& ev) { const auto round = ev->Get()->Round; if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvStatisticsRequestTimeout"); + SA_LOG_D("Skip TEvStatisticsRequestTimeout"); return; } const auto tabletId = ev->Get()->TabletId; auto tabletPipe = AggregationStatistics.LocalTablets.TabletsPipes.find(tabletId); if (tabletPipe == AggregationStatistics.LocalTablets.TabletsPipes.end()) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Tablet " << tabletId << " has already been processed"); + SA_LOG_D("Tablet " << tabletId << " has already been processed"); return; } - LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "No result was received from the tablet " << tabletId); + SA_LOG_E("No result was received from the tablet " << tabletId); auto clientId = tabletPipe->second; OnTabletError(tabletId); @@ -957,15 +944,13 @@ class TStatService : public TActorBootstrapped { NTabletPipe::SendData(SelfId(), clientId, request.release(), round); Schedule(Settings.StatisticsRequestTimeout, new TEvPrivate::TEvStatisticsRequestTimeout(round, tabletId)); - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "TEvStatisticsRequest send" + SA_LOG_D("TEvStatisticsRequest send" << ", client id = " << clientId << ", path = " << *path); } void OnTabletError(ui64 tabletId) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Tablet " << tabletId << " is not local."); + SA_LOG_D("Tablet " << tabletId << " is not local."); const auto error = NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET; AggregationStatistics.FailedTablets.emplace_back(tabletId, 0, error); @@ -983,8 +968,7 @@ class TStatService : public TActorBootstrapped { const auto& clientId = ev->Get()->ClientId; const auto& tabletId = ev->Get()->TabletId; - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "EvClientConnected" + SA_LOG_D("EvClientConnected" << ", node id = " << ev->Get()->ClientId.NodeId() << ", client id = " << clientId << ", server id = " << ev->Get()->ServerId @@ -992,6 +976,7 @@ class TStatService : public TActorBootstrapped { << ", status = " << ev->Get()->Status); if (clientId == SAPipeClientId) { + IsStatisticsDisabledInSA = false; if (ev->Get()->Status != NKikimrProto::OK) { SAPipeClientId = TActorId(); ConnectToSA(); @@ -1012,22 +997,21 @@ class TStatService : public TActorBootstrapped { return; } - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip EvClientConnected"); + SA_LOG_D("Skip EvClientConnected"); } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { const auto& clientId = ev->Get()->ClientId; const auto& tabletId = ev->Get()->TabletId; - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "EvClientDestroyed" + SA_LOG_D("EvClientDestroyed" << ", node id = " << ev->Get()->ClientId.NodeId() << ", client id = " << clientId << ", server id = " << ev->Get()->ServerId << ", tablet id = " << tabletId); if (clientId == SAPipeClientId) { + IsStatisticsDisabledInSA = false; SAPipeClientId = TActorId(); ConnectToSA(); SyncNode(); @@ -1042,23 +1026,29 @@ class TStatService : public TActorBootstrapped { return; } - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip EvClientDestroyed"); + SA_LOG_D("Skip EvClientDestroyed"); } void Handle(TEvStatistics::TEvStatisticsIsDisabled::TPtr&) { + IsStatisticsDisabledInSA = true; ReplyAllFailed(); } void Handle(TEvStatistics::TEvLoadStatisticsQueryResponse::TPtr& ev) { ui64 cookie = ev->Get()->Cookie; - auto itLoadQuery = LoadQueriesInFlight.find(cookie); Y_ABORT_UNLESS(itLoadQuery != LoadQueriesInFlight.end()); auto [requestId, requestIndex] = itLoadQuery->second; + SA_LOG_D("TEvLoadStatisticsQueryResponse, request id = " << requestId); + auto itRequest = InFlight.find(requestId); - Y_ABORT_UNLESS(itRequest != InFlight.end()); + if (InFlight.end() == itRequest) { + SA_LOG_E("TEvLoadStatisticsQueryResponse, request id = " << requestId + << ". Request not found in InFlight"); + return; + } + auto& request = itRequest->second; auto& response = request.StatResponses[requestIndex]; @@ -1085,8 +1075,7 @@ class TStatService : public TActorBootstrapped { } void Handle(TEvPrivate::TEvRequestTimeout::TPtr& ev) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "EvRequestTimeout" + SA_LOG_D("EvRequestTimeout" << ", pipe client id = " << ev->Get()->PipeClientId << ", schemeshard count = " << ev->Get()->NeedSchemeShards.size()); @@ -1118,8 +1107,7 @@ class TStatService : public TActorBootstrapped { NTabletPipe::TClientConfig pipeConfig{policy}; SAPipeClientId = Register(NTabletPipe::CreateClient(SelfId(), StatisticsAggregatorId, pipeConfig)); - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "ConnectToSA(), pipe client id = " << SAPipeClientId); + SA_LOG_D("ConnectToSA(), pipe client id = " << SAPipeClientId); } void SyncNode() { @@ -1148,8 +1136,7 @@ class TStatService : public TActorBootstrapped { Schedule(RequestTimeout, timeout.release()); } - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "SyncNode(), pipe client id = " << SAPipeClientId); + SA_LOG_D("SyncNode(), pipe client id = " << SAPipeClientId); } void ReplySuccess(ui64 requestId, bool eraseRequest) { @@ -1159,8 +1146,7 @@ class TStatService : public TActorBootstrapped { } auto& request = itRequest->second; - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "ReplySuccess(), request id = " << requestId + SA_LOG_D("ReplySuccess(), request id = " << requestId << ", ReplyToActorId = " << request.ReplyToActorId << ", StatRequests.size() = " << request.StatRequests.size()); @@ -1206,8 +1192,7 @@ class TStatService : public TActorBootstrapped { } auto& request = itRequest->second; - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "ReplyFailed(), request id = " << requestId); + SA_LOG_D("ReplyFailed(), request id = " << requestId); auto result = std::make_unique(); result->Success = false; @@ -1249,146 +1234,274 @@ class TStatService : public TActorBootstrapped { void PrintStatServiceState(TStringStream& str) { HTML(str) { PRE() { - str << "---- StatisticsService ----" << Endl << Endl; - str << "StatisticsAggregatorId: " << StatisticsAggregatorId << Endl; - str << "SAPipeClientId: " << SAPipeClientId << Endl; - - str << "InFlight: " << InFlight.size(); - { - ui32 simple{ 0 }; - ui32 countMin{ 0 }; - for (auto it = InFlight.begin(); it != InFlight.end(); ++it) { - if (it->second.StatType == EStatType::SIMPLE) { - ++simple; - } else if (it->second.StatType == EStatType::COUNT_MIN_SKETCH) { - ++countMin; - } + str << "---- StatisticsService ----" << Endl << Endl; + str << "StatisticsAggregatorId: " << StatisticsAggregatorId << Endl; + str << "SAPipeClientId: " << SAPipeClientId << Endl; + + str << "InFlight: " << InFlight.size(); + { + ui32 simple{ 0 }; + ui32 countMin{ 0 }; + for (auto it = InFlight.begin(); it != InFlight.end(); ++it) { + if (it->second.StatType == EStatType::SIMPLE) { + ++simple; + } else if (it->second.StatType == EStatType::COUNT_MIN_SKETCH) { + ++countMin; } - str << "[SIMPLE: " << simple << ", COUNT_MIN_SKETCH: " << countMin << "]" << Endl; } - str << "NextRequestId: " << NextRequestId << Endl; + str << "[SIMPLE: " << simple << ", COUNT_MIN_SKETCH: " << countMin << "]" << Endl; + } + str << "NextRequestId: " << NextRequestId << Endl; - str << "LoadQueriesInFlight: " << LoadQueriesInFlight.size() << Endl; - str << "NextLoadQueryCookie: " << NextLoadQueryCookie << Endl; + str << "LoadQueriesInFlight: " << LoadQueriesInFlight.size() << Endl; + str << "NextLoadQueryCookie: " << NextLoadQueryCookie << Endl; - str << "NeedSchemeShards: " << NeedSchemeShards.size() << Endl; - str << "Statistics: " << Statistics.size() << Endl; + str << "NeedSchemeShards: " << NeedSchemeShards.size() << Endl; + str << "Statistics: " << Statistics.size() << Endl; - str << "ResolveSAStage: "; - if (ResolveSAStage == RSA_INITIAL) { - str << "RSA_INITIAL"; - } else if (ResolveSAStage == RSA_IN_FLIGHT) { - str << "RSA_IN_FLIGHT"; + str << "ResolveSAStage: "; + if (ResolveSAStage == RSA_INITIAL) { + str << "RSA_INITIAL"; + } else if (ResolveSAStage == RSA_IN_FLIGHT) { + str << "RSA_IN_FLIGHT"; + } + else { + str << "RSA_FINISHED"; + } + str << Endl; + + str << "AggregateKeepAlivePeriod: " << Settings.AggregateKeepAlivePeriod << Endl; + str << "AggregateKeepAliveTimeout: " << Settings.AggregateKeepAliveTimeout << Endl; + str << "AggregateKeepAliveAckTimeout: " << Settings.AggregateKeepAliveAckTimeout << Endl; + str << "StatisticsRequestTimeout: " << Settings.StatisticsRequestTimeout << Endl; + str << "MaxInFlightTabletRequests: " << Settings.MaxInFlightTabletRequests << Endl; + str << "FanOutFactor: " << Settings.FanOutFactor << Endl; + + str << "---- AggregationStatistics ----" << Endl; + str << "Round: " << AggregationStatistics.Round << Endl; + str << "Cookie: " << AggregationStatistics.Cookie << Endl; + str << "PathId: " << AggregationStatistics.PathId.ToString() << Endl; + str << "LastAckHeartbeat: " << AggregationStatistics.LastAckHeartbeat << Endl; + str << "ParentNode: " << AggregationStatistics.ParentNode << Endl; + str << "PprocessedNodes: " << AggregationStatistics.PprocessedNodes << Endl; + str << "TotalStatisticsResponse: " << AggregationStatistics.TotalStatisticsResponse << Endl; + str << "Nodes: " << AggregationStatistics.Nodes.size() << Endl; + str << "CountMinSketches: " << AggregationStatistics.CountMinSketches.size() << Endl; + } + } + } + + void AddPanel(IOutputStream& str, const TString& title, const std::function& bodyRender) { + HTML(str) { + DIV_CLASS("panel panel-default") { + DIV_CLASS("panel-heading") { + H4_CLASS("panel-title") { + str << title; + } } - else { - str << "RSA_FINISHED"; + DIV_CLASS("panel-body") { + bodyRender(str); } - str << Endl; - - str << "AggregateKeepAlivePeriod: " << Settings.AggregateKeepAlivePeriod << Endl; - str << "AggregateKeepAliveTimeout: " << Settings.AggregateKeepAliveTimeout << Endl; - str << "AggregateKeepAliveAckTimeout: " << Settings.AggregateKeepAliveAckTimeout << Endl; - str << "StatisticsRequestTimeout: " << Settings.StatisticsRequestTimeout << Endl; - str << "MaxInFlightTabletRequests: " << Settings.MaxInFlightTabletRequests << Endl; - str << "FanOutFactor: " << Settings.FanOutFactor << Endl; - - str << "---- AggregationStatistics ----" << Endl; - str << "Round: " << AggregationStatistics.Round << Endl; - str << "Cookie: " << AggregationStatistics.Cookie << Endl; - str << "PathId: " << AggregationStatistics.PathId.ToString() << Endl; - str << "LastAckHeartbeat: " << AggregationStatistics.LastAckHeartbeat << Endl; - str << "ParentNode: " << AggregationStatistics.ParentNode << Endl; - str << "PprocessedNodes: " << AggregationStatistics.PprocessedNodes << Endl; - str << "TotalStatisticsResponse: " << AggregationStatistics.TotalStatisticsResponse << Endl; - str << "Nodes: " << AggregationStatistics.Nodes.size() << Endl; - str << "CountMinSketches: " << AggregationStatistics.CountMinSketches.size() << Endl; } } } - void Handle(NMon::TEvHttpInfo::TPtr& ev) { - auto& request = ev->Get()->Request; + void PrintForm(TStringStream& str) { + HTML(str) { + AddPanel(str, "Analyze table", [](IOutputStream& str) { + HTML(str) { + FORM_CLASS("form-horizontal") { + DIV_CLASS("form-group") { + LABEL_CLASS_FOR("col-sm-2 control-label", "path") { + str << "Path"; + } + DIV_CLASS("col-sm-8") { + str << ""; + } + str << ""; + DIV_CLASS("col-sm-2") { + str << ""; + } + } + } + } + }); + AddPanel(str, "Get operation status", [](IOutputStream& str) { + HTML(str) { + FORM_CLASS("form-horizontal") { + DIV_CLASS("form-group") { + LABEL_CLASS_FOR("col-sm-2 control-label", "path") { + str << "Path"; + } + DIV_CLASS("col-sm-8") { + str << ""; + } + } + DIV_CLASS("form-group") { + LABEL_CLASS_FOR("col-sm-2 control-label", "operation") { + str << "OperationId"; + } + DIV_CLASS("col-sm-8") { + str << ""; + } + str << ""; + DIV_CLASS("col-sm-2") { + str << ""; + } + } + } + } + }); + AddPanel(str, "Probe count-min sketch", [](IOutputStream& str) { + HTML(str) { + FORM_CLASS("form-horizontal") { + DIV_CLASS("form-group") { + LABEL_CLASS_FOR("col-sm-2 control-label", "database") { + str << "Database"; + } + DIV_CLASS("col-sm-8") { + str << ""; + } + } + DIV_CLASS("form-group") { + LABEL_CLASS_FOR("col-sm-2 control-label", "path") { + str << "Path"; + } + DIV_CLASS("col-sm-8") { + str << ""; + } + } + DIV_CLASS("form-group") { + LABEL_CLASS_FOR("col-sm-2 control-label", "column") { + str << "ColumnName"; + } + DIV_CLASS("col-sm-8") { + str << ""; + } + } + DIV_CLASS("form-group") { + LABEL_CLASS_FOR("col-sm-2 control-label", "cell") { + str << "Value"; + } + DIV_CLASS("col-sm-8") { + str << ""; + } + + str << ""; + DIV_CLASS("col-sm-2") { + str << ""; + } + } + } + } + }); - auto method = request.GetMethod(); - if (method == HTTP_METHOD_POST) { - if (!EnableColumnStatistics) { - Send(ev->Sender, new NMon::TEvHttpInfoRes("Column statistics is disabled")); - return; - } + PrintStatServiceState(str); + } + } - auto& params = request.GetPostParams(); - auto itAction = params.find("action"); - if (itAction == params.end()) { - Send(ev->Sender, new NMon::TEvHttpInfoRes("'action' parameter is required")); - return; - } - if (itAction->second != "analyze") { - Send(ev->Sender, new NMon::TEvHttpInfoRes("Unknown 'action' parameter")); - return; - } - auto itPath = params.find("path"); - if (itPath == params.end()) { - Send(ev->Sender, new NMon::TEvHttpInfoRes("'path' parameter is required")); - return; - } - Register(new THttpRequest(THttpRequest::EType::ANALYZE, itPath->second, ev->Sender)); + void Handle(NMon::TEvHttpInfoRes::TPtr& ev) { + if (HttpRequestActorId != ev->Sender) { return; + } - } else if (method == HTTP_METHOD_GET) { - auto& params = request.GetParams(); - if (params.empty()) { - TStringStream str; - PrintStatServiceState(str); - Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str())); - return; + HttpRequestActorId = TActorId(); + + const auto* msg = ev->CastAsLocal(); + if (msg != nullptr) { + ReplyToMonitoring(msg->Answer); + } + } + + void ReplyToMonitoring(const TString& description) { + TStringStream str; + + if (!description.empty()) { + HTML(str) { + DIV_CLASS("row") { + DIV_CLASS("col-md-12 alert alert-info") { + str << description; + } + } } + } + + PrintForm(str); + Send(MonitoringActorId, new NMon::TEvHttpInfoRes(str.Str())); + } + + void Handle(NMon::TEvHttpInfo::TPtr& ev) { + if (!EnableColumnStatistics) { + Send(ev->Sender, new NMon::TEvHttpInfoRes("Column statistics is disabled")); + return; + } + + HttpRequestActorId = TActorId(); + MonitoringActorId = ev->Sender; + + const auto& request = ev->Get()->Request; + const auto& params = request.GetParams(); - if (!EnableColumnStatistics) { - Send(ev->Sender, new NMon::TEvHttpInfoRes("Column statistics is disabled")); + auto getRequestParam = [¶ms](const TString& name){ + auto it = params.find(name); + return it != params.end() ? it->second : TString(); + }; + + const auto action = getRequestParam("action"); + if (action.empty()) { + ReplyToMonitoring(""); + return; + } + + const auto path = getRequestParam("path"); + if (path.empty()) { + ReplyToMonitoring("'Path' parameter is required"); + return; + } + + if (action == "analyze") { + HttpRequestActorId = Register(new THttpRequest(THttpRequest::ERequestType::ANALYZE, { + { THttpRequest::EParamType::PATH, path } + }, SelfId())); + } else if (action == "status") { + const auto operationId = getRequestParam("operation"); + if (operationId.empty()) { + ReplyToMonitoring("'OperationId' parameter is required"); return; } - auto itAction = params.find("action"); - if (itAction == params.end()) { - Send(ev->Sender, new NMon::TEvHttpInfoRes("'action' parameter is required")); + HttpRequestActorId = Register(new THttpRequest(THttpRequest::ERequestType::STATUS, { + { THttpRequest::EParamType::PATH, path }, + { THttpRequest::EParamType::OPERATION_ID, operationId } + }, SelfId())); + } else if (action == "probe") { + const auto column = getRequestParam("column"); + if (column.empty()) { + ReplyToMonitoring("'ColumnName' parameter is required"); return; } - if (itAction->second != "status") { - Send(ev->Sender, new NMon::TEvHttpInfoRes("Unknown 'action' parameter")); + + const auto cell = getRequestParam("cell"); + if (cell.empty()) { + ReplyToMonitoring("'Value' parameter is required"); return; } - auto itPath = params.find("path"); - if (itPath == params.end()) { - Send(ev->Sender, new NMon::TEvHttpInfoRes("'path' parameter is required")); + + const auto database = getRequestParam("database"); + if (database.empty()) { + ReplyToMonitoring("'Database' parameter is required"); return; } - Register(new THttpRequest(THttpRequest::EType::STATUS, itPath->second, ev->Sender)); - return; - } - TStringStream str; - HTML(str) { - str << "
" << Endl; - str << ""; - DIV() { - str << ""; - } - DIV() { - str << ""; - } - str << "
" << Endl; - str << "
" << Endl; - str << ""; - DIV() { - str << ""; - } - DIV() { - str << ""; - } - str << "
" << Endl; + HttpRequestActorId = Register(new THttpRequest(THttpRequest::ERequestType::COUNT_MIN_SKETCH_PROBE, { + { THttpRequest::EParamType::DATABASE, database }, + { THttpRequest::EParamType::PATH, path }, + { THttpRequest::EParamType::COLUMN_NAME, column }, + { THttpRequest::EParamType::CELL_VALUE, cell } + }, SelfId())); + } else { + ReplyToMonitoring("Wrong 'action' parameter value"); } - - Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str())); } private: @@ -1397,6 +1510,7 @@ class TStatService : public TActorBootstrapped { bool EnableStatistics = false; bool EnableColumnStatistics = false; + bool IsStatisticsDisabledInSA = false; static constexpr size_t StatFanOut = 10; @@ -1440,6 +1554,9 @@ class TStatService : public TActorBootstrapped { EResolveSAStage ResolveSAStage = RSA_INITIAL; static constexpr TDuration RequestTimeout = TDuration::MilliSeconds(100); + + TActorId HttpRequestActorId; + TActorId MonitoringActorId; }; THolder CreateStatService(const TStatServiceSettings& settings) { diff --git a/ydb/core/statistics/service/ut/ut_basic_statistics.cpp b/ydb/core/statistics/service/ut/ut_basic_statistics.cpp index 898e147cfabf..310623fcaf7a 100644 --- a/ydb/core/statistics/service/ut/ut_basic_statistics.cpp +++ b/ydb/core/statistics/service/ut/ut_basic_statistics.cpp @@ -21,17 +21,13 @@ using namespace NYdb::NScheme; namespace { void CreateTable(TTestEnv& env, const TString& databaseName, const TString& tableName, size_t rowCount) { - TTableClient client(env.GetDriver()); - auto session = client.CreateSession().GetValueSync().GetSession(); - - auto result = session.ExecuteSchemeQuery(Sprintf(R"( + ExecuteYqlScript(env, Sprintf(R"( CREATE TABLE `Root/%s/%s` ( Key Uint64, Value Uint64, PRIMARY KEY (Key) ); - )", databaseName.c_str(), tableName.c_str())).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )", databaseName.c_str(), tableName.c_str())); TStringBuilder replace; replace << Sprintf("REPLACE INTO `Root/%s/%s` (Key, Value) VALUES ", @@ -43,8 +39,7 @@ void CreateTable(TTestEnv& env, const TString& databaseName, const TString& tabl replace << Sprintf("(%uu, %uu)", i, i); } replace << ";"; - result = session.ExecuteDataQuery(replace, TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + ExecuteYqlScript(env, replace); } void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount) { @@ -76,7 +71,7 @@ void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId break; } - runtime.SimulateSleep(TDuration::Seconds(5)); + runtime.SimulateSleep(TDuration::Seconds(1)); } } @@ -87,15 +82,10 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(Simple) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateTable(env, "Database", "Table", 5); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); + CreateTable(env, "Database", "Table", 5); auto pathId = ResolvePathId(runtime, "/Root/Database/Table"); ValidateRowCount(runtime, 1, pathId, 5); @@ -104,15 +94,10 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(TwoNodes) { TTestEnv env(1, 2); - auto init = [&] () { - CreateDatabase(env, "Database", 2); - CreateTable(env, "Database", "Table", 5); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database", 2); + CreateTable(env, "Database", "Table", 5); auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table"); ValidateRowCount(runtime, 1, pathId1, 5); @@ -121,16 +106,12 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(TwoTables) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateTable(env, "Database", "Table1", 5); - CreateTable(env, "Database", "Table2", 6); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); + CreateTable(env, "Database", "Table1", 5); + CreateTable(env, "Database", "Table2", 6); auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Database/Table2"); @@ -140,17 +121,13 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(TwoDatabases) { TTestEnv env(1, 2); - auto init = [&] () { - CreateDatabase(env, "Database1"); - CreateDatabase(env, "Database2"); - CreateTable(env, "Database1", "Table1", 5); - CreateTable(env, "Database2", "Table2", 6); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database1", 1, false, "hdd1"); + CreateDatabase(env, "Database2", 1, false, "hdd2"); + CreateTable(env, "Database1", "Table1", 5); + CreateTable(env, "Database2", "Table2", 6); auto pathId1 = ResolvePathId(runtime, "/Root/Database1/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Database2/Table2"); @@ -160,26 +137,12 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(Serverless) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless", domainKey); - CreateTable(env, "Serverless", "Table", 5); - }; - std::thread init2Thread(init2); - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Serverless", "/Root/Shared"); + CreateTable(env, "Serverless", "Table", 5); auto pathId = ResolvePathId(runtime, "/Root/Serverless/Table"); ValidateRowCount(runtime, 1, pathId, 5); @@ -187,28 +150,14 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(TwoServerlessDbs) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless1", domainKey); - CreateServerlessDatabase(env, "Serverless2", domainKey); - CreateTable(env, "Serverless1", "Table1", 5); - CreateTable(env, "Serverless2", "Table2", 6); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Serverless1", "/Root/Shared"); + CreateServerlessDatabase(env, "Serverless2", "/Root/Shared"); + CreateTable(env, "Serverless1", "Table1", 5); + CreateTable(env, "Serverless2", "Table2", 6); auto pathId1 = ResolvePathId(runtime, "/Root/Serverless1/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Serverless2/Table2"); @@ -218,30 +167,15 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(TwoServerlessTwoSharedDbs) { TTestEnv env(1, 2); - auto init = [&] () { - CreateDatabase(env, "Shared1"); - CreateDatabase(env, "Shared2"); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - - TPathId domainKey1, domainKey2; - ResolvePathId(runtime, "/Root/Shared1", &domainKey1); - ResolvePathId(runtime, "/Root/Shared2", &domainKey2); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless1", domainKey1); - CreateServerlessDatabase(env, "Serverless2", domainKey2); - CreateTable(env, "Serverless1", "Table1", 5); - CreateTable(env, "Serverless2", "Table2", 6); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); + + CreateDatabase(env, "Shared1", 1, true, "hdd1"); + CreateDatabase(env, "Shared2", 1, true, "hdd2"); + CreateServerlessDatabase(env, "Serverless1", "/Root/Shared1"); + CreateServerlessDatabase(env, "Serverless2", "/Root/Shared2"); + CreateTable(env, "Serverless1", "Table1", 5); + CreateTable(env, "Serverless2", "Table2", 6); auto pathId1 = ResolvePathId(runtime, "/Root/Serverless1/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Serverless2/Table2"); diff --git a/ydb/core/statistics/service/ut/ut_http/ut_http_request.cpp b/ydb/core/statistics/service/ut/ut_http/ut_http_request.cpp new file mode 100644 index 000000000000..bb6924dd32f0 --- /dev/null +++ b/ydb/core/statistics/service/ut/ut_http/ut_http_request.cpp @@ -0,0 +1,99 @@ +#include +#include +#include +#include +#include + +namespace NKikimr { +namespace NStat { + +void AnalyzeTest(bool isServerless) { + TTestEnv env(1, 1); + auto& runtime = *env.GetServer().GetRuntime(); + const auto databaseInfo = isServerless + ? CreateServerlessDatabaseColumnTables(env, 1, 10) + : CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; + const auto sender = runtime.AllocateEdgeActor(); + + runtime.Register(new THttpRequest(THttpRequest::ERequestType::ANALYZE, { + { THttpRequest::EParamType::PATH, tableInfo.Path } + }, sender)); + + auto res = runtime.GrabEdgeEvent(sender); + auto msg = static_cast(res->Get()); + + Cerr << "Answer: '" << msg->Answer << "'" << Endl; + const TString expected = "Analyze sent. OperationId:"; + UNIT_ASSERT_STRING_CONTAINS(msg->Answer, expected); +} + +void ProbeTest(bool isServerless) { + TTestEnv env(1, 1); + auto& runtime = *env.GetServer().GetRuntime(); + const auto databaseInfo = isServerless + ? CreateServerlessDatabaseColumnTables(env, 1, 10) + : CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; + TString columnName = "Value"; + const auto sender = runtime.AllocateEdgeActor(); + + const auto operationId = TULIDGenerator().Next(TInstant::Now()).ToBinary(); + auto analyzeRequest = MakeAnalyzeRequest({{tableInfo.PathId, {1, 2}}}, operationId); + runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest.release()); + runtime.GrabEdgeEventRethrow(sender); + + runtime.Register(new THttpRequest(THttpRequest::ERequestType::COUNT_MIN_SKETCH_PROBE, { + { THttpRequest::EParamType::DATABASE, databaseInfo.FullDatabaseName}, + { THttpRequest::EParamType::PATH, tableInfo.Path }, + { THttpRequest::EParamType::COLUMN_NAME, columnName }, + { THttpRequest::EParamType::CELL_VALUE, "1" } + }, sender)); + auto res = runtime.GrabEdgeEvent(sender); + auto msg = static_cast(res->Get()); + + Cerr << "Answer: '" << msg->Answer << "'" << Endl; + const TString expected = tableInfo.Path + "[" + columnName + "]="; + UNIT_ASSERT_STRING_CONTAINS(msg->Answer, expected); +} + +Y_UNIT_TEST_SUITE(HttpRequest) { + Y_UNIT_TEST(Analyze) { + AnalyzeTest(false); + } + + Y_UNIT_TEST(AnalyzeServerless) { + AnalyzeTest(true); + } + + Y_UNIT_TEST(Status) { + TTestEnv env(1, 1); + auto& runtime = *env.GetServer().GetRuntime(); + const auto databaseInfo = CreateDatabaseColumnTables(env, 1, 10); + const auto& tableInfo = databaseInfo.Tables[0]; + + const auto sender = runtime.AllocateEdgeActor(); + const auto operationId = TULIDGenerator().Next(TInstant::Now()).ToString(); + runtime.Register(new THttpRequest(THttpRequest::ERequestType::STATUS, { + { THttpRequest::EParamType::PATH, tableInfo.Path }, + { THttpRequest::EParamType::OPERATION_ID, operationId } + }, sender)); + + auto res = runtime.GrabEdgeEvent(sender); + auto msg = static_cast(res->Get()); + + Cerr << "Answer: '" << msg->Answer << "'" << Endl; + UNIT_ASSERT_EQUAL(msg->Answer, "No analyze operation"); + } + + Y_UNIT_TEST(Probe) { + ProbeTest(false); + } + + Y_UNIT_TEST(ProbeServerless) { + ProbeTest(true); + } +} + +} // NStat +} // NKikimr \ No newline at end of file diff --git a/ydb/core/statistics/service/ut/ut_http/ya.make b/ydb/core/statistics/service/ut/ut_http/ya.make new file mode 100644 index 000000000000..114a7168cbf6 --- /dev/null +++ b/ydb/core/statistics/service/ut/ut_http/ya.make @@ -0,0 +1,28 @@ +UNITTEST_FOR(ydb/core/statistics/service) + +FORK_SUBTESTS() + +IF (WITH_VALGRIND) + TIMEOUT(3600) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +YQL_LAST_ABI_VERSION() + +PEERDIR( + library/cpp/testing/unittest + ydb/core/protos + ydb/core/testlib/default + ydb/core/statistics/ut_common +) + +SRCS( + ut_http_request.cpp +) + +END() + diff --git a/ydb/core/statistics/service/ut/ya.make b/ydb/core/statistics/service/ut/ya.make index 7c07171048c5..1b32051659da 100644 --- a/ydb/core/statistics/service/ut/ya.make +++ b/ydb/core/statistics/service/ut/ya.make @@ -27,3 +27,6 @@ SRCS( END() +RECURSE_FOR_TESTS( + ut_http +) diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index 2737cb68a5ff..18c2ec9fd5fb 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -7,14 +7,14 @@ #include #include +#include -// TODO remove SDK -#include -#include -#include +#include +#include +#include -// TODO remove thread -#include +#include +#include using namespace NYdb; using namespace NYdb::NTable; @@ -23,30 +23,9 @@ using namespace NYdb::NScheme; namespace NKikimr { namespace NStat { -NKikimrSubDomains::TSubDomainSettings GetSubDomainDeclareSettings(const TString &name, const TStoragePools &pools) { - NKikimrSubDomains::TSubDomainSettings subdomain; - subdomain.SetName(name); - for (auto& pool: pools) { - *subdomain.AddStoragePools() = pool; - } - return subdomain; -} - -NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(const TString &name, const TStoragePools &pools) { - NKikimrSubDomains::TSubDomainSettings subdomain; - subdomain.SetName(name); - subdomain.SetCoordinators(1); - subdomain.SetMediators(1); - subdomain.SetPlanResolution(50); - subdomain.SetTimeCastBucketsPerMediator(2); - for (auto& pool: pools) { - *subdomain.AddStoragePools() = pool; - } - return subdomain; -} - -TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool useRealThreads) - : CSController(NYDBTest::TControllers::RegisterCSControllerGuard()) { +TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, bool useRealThreads) + : CSController(NYDBTest::TControllers::RegisterCSControllerGuard()) +{ auto mbusPort = PortManager.GetPort(); auto grpcPort = PortManager.GetPort(); @@ -55,17 +34,14 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool Settings->SetNodeCount(staticNodes); Settings->SetDynamicNodeCount(dynamicNodes); Settings->SetUseRealThreads(useRealThreads); + Settings->AddStoragePoolType("hdd1"); + Settings->AddStoragePoolType("hdd2"); NKikimrConfig::TFeatureFlags featureFlags; featureFlags.SetEnableStatistics(true); featureFlags.SetEnableColumnStatistics(true); Settings->SetFeatureFlags(featureFlags); - for (ui32 i : xrange(storagePools)) { - TString poolName = Sprintf("test%d", i); - Settings->AddStoragePool(poolName, TString("/Root:") + poolName, 2); - } - Server = new Tests::TServer(*Settings); Server->EnableGRpc(grpcPort); @@ -91,45 +67,73 @@ TTestEnv::~TTestEnv() { Driver->Stop(true); } -TStoragePools TTestEnv::GetPools() const { - TStoragePools pools; - for (const auto& [kind, pool] : Settings->StoragePoolTypes) { - pools.emplace_back(pool.GetName(), kind); +TString CreateDatabase(TTestEnv& env, const TString& databaseName, + size_t nodeCount, bool isShared, const TString& poolName) +{ + auto& runtime = *env.GetServer().GetRuntime(); + auto fullDbName = Sprintf("/Root/%s", databaseName.c_str()); + + using TEvCreateDatabaseRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall< + Ydb::Cms::CreateDatabaseRequest, + Ydb::Cms::CreateDatabaseResponse>; + + Ydb::Cms::CreateDatabaseRequest request; + request.set_path(fullDbName); + if (isShared) { + auto* resources = request.mutable_shared_resources(); + auto* storage = resources->add_storage_units(); + storage->set_unit_kind(poolName); + storage->set_count(1); + } else { + auto* resources = request.mutable_resources(); + auto* storage = resources->add_storage_units(); + storage->set_unit_kind(poolName); + storage->set_count(1); } - return pools; -} -void CreateDatabase(TTestEnv& env, const TString& databaseName, size_t nodeCount) { - auto subdomain = GetSubDomainDeclareSettings(databaseName); - UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, - env.GetClient().CreateExtSubdomain("/Root", subdomain)); + auto future = NRpcService::DoLocalRpc( + std::move(request), "", "", runtime.GetActorSystem(0)); + auto response = runtime.WaitFuture(std::move(future)); + UNIT_ASSERT(response.operation().ready()); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + env.GetTenants().Run(fullDbName, nodeCount); - env.GetTenants().Run("/Root/" + databaseName, nodeCount); + if (!env.GetServer().GetSettings().UseRealThreads) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } - auto subdomainSettings = GetSubDomainDefaultSettings(databaseName, env.GetPools()); - subdomainSettings.SetExternalSchemeShard(true); - subdomainSettings.SetExternalStatisticsAggregator(true); - UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, - env.GetClient().AlterExtSubdomain("/Root", subdomainSettings)); + return fullDbName; } -void CreateServerlessDatabase(TTestEnv& env, const TString& databaseName, TPathId resourcesDomainKey) { - auto subdomain = GetSubDomainDeclareSettings(databaseName); - subdomain.MutableResourcesDomainKey()->SetSchemeShard(resourcesDomainKey.OwnerId); - subdomain.MutableResourcesDomainKey()->SetPathId(resourcesDomainKey.LocalPathId); - UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, - env.GetClient().CreateExtSubdomain("/Root", subdomain)); +TString CreateServerlessDatabase(TTestEnv& env, const TString& databaseName, const TString& sharedName) { + auto& runtime = *env.GetServer().GetRuntime(); + auto fullDbName = Sprintf("/Root/%s", databaseName.c_str()); + + using TEvCreateDatabaseRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall< + Ydb::Cms::CreateDatabaseRequest, + Ydb::Cms::CreateDatabaseResponse>; + + Ydb::Cms::CreateDatabaseRequest request; + request.set_path(fullDbName); + request.mutable_serverless_resources()->set_shared_database_path(sharedName); + + auto future = NRpcService::DoLocalRpc( + std::move(request), "", "", runtime.GetActorSystem(0)); + auto response = runtime.WaitFuture(std::move(future)); + UNIT_ASSERT(response.operation().ready()); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - env.GetTenants().Run("/Root/" + databaseName, 0); + env.GetTenants().Run(fullDbName, 0); - auto subdomainSettings = GetSubDomainDefaultSettings(databaseName, env.GetPools()); - subdomainSettings.SetExternalSchemeShard(true); - UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, - env.GetClient().AlterExtSubdomain("/Root", subdomainSettings)); + if (!env.GetServer().GetSettings().UseRealThreads) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + + return fullDbName; } -TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey, ui64* saTabletId) -{ +TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey, ui64* saTabletId) { auto sender = runtime.AllocateEdgeActor(); using TNavigate = NSchemeCache::TSchemeCacheNavigate; @@ -184,8 +188,7 @@ TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* return resultEntry.TableId.PathId; } -NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString& path) -{ +NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString& path) { TAutoPtr handle; auto request = MakeHolder(); @@ -197,8 +200,7 @@ NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, return *reply->MutableRecord(); } -TVector GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path) -{ +TVector GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path) { TVector shards; auto lsResult = DescribeTable(runtime, sender, path); for (auto &part : lsResult.GetPathDescription().GetTablePartitions()) @@ -207,8 +209,7 @@ TVector GetTableShards(TTestActorRuntime& runtime, TActorId sender, const return shards; } -TVector GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path) -{ +TVector GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path) { TVector shards; auto lsResult = DescribeTable(runtime, sender, path); for (auto &part : lsResult.GetPathDescription().GetColumnTableDescription().GetSharding().GetColumnShards()) @@ -217,19 +218,36 @@ TVector GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender, return shards; } -void CreateUniformTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { - TTableClient client(env.GetDriver()); - auto session = client.CreateSession().GetValueSync().GetSession(); +Ydb::StatusIds::StatusCode ExecuteYqlScript(TTestEnv& env, const TString& script, bool mustSucceed) { + auto& runtime = *env.GetServer().GetRuntime(); + + using TEvExecuteYqlRequest = NGRpcService::TGrpcRequestOperationCall< + Ydb::Scripting::ExecuteYqlRequest, + Ydb::Scripting::ExecuteYqlResponse>; + + Ydb::Scripting::ExecuteYqlRequest request; + request.set_script(script); - auto result = session.ExecuteSchemeQuery(Sprintf(R"( + auto future = NRpcService::DoLocalRpc( + std::move(request), "", "", runtime.GetActorSystem(0)); + auto response = runtime.WaitFuture(std::move(future)); + + UNIT_ASSERT(response.operation().ready()); + if (mustSucceed) { + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + } + return response.operation().status(); +} + +void CreateUniformTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { + ExecuteYqlScript(env, Sprintf(R"( CREATE TABLE `Root/%s/%s` ( Key Uint64, Value Uint64, PRIMARY KEY (Key) ) WITH ( UNIFORM_PARTITIONS = 4 ); - )", databaseName.c_str(), tableName.c_str())).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )", databaseName.c_str(), tableName.c_str())); TStringBuilder replace; replace << Sprintf("REPLACE INTO `Root/%s/%s` (Key, Value) VALUES ", @@ -242,18 +260,16 @@ void CreateUniformTable(TTestEnv& env, const TString& databaseName, const TStrin replace << Sprintf("(%" PRIu64 "ul, %" PRIu64 "ul)", value, value); } replace << ";"; - result = session.ExecuteDataQuery(replace, TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + ExecuteYqlScript(env, replace); } void CreateColumnStoreTable(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount) { - TTableClient client(env.GetDriver()); - auto session = client.CreateSession().GetValueSync().GetSession(); - auto fullTableName = Sprintf("Root/%s/%s", databaseName.c_str(), tableName.c_str()); - auto result = session.ExecuteSchemeQuery(Sprintf(R"( + auto& runtime = *env.GetServer().GetRuntime(); + + ExecuteYqlScript(env, Sprintf(R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, Value Uint64, @@ -264,106 +280,100 @@ void CreateColumnStoreTable(TTestEnv& env, const TString& databaseName, const TS STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d ); - )", fullTableName.c_str(), shardCount)).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )", fullTableName.c_str(), shardCount)); + runtime.SimulateSleep(TDuration::Seconds(1)); - result = session.ExecuteSchemeQuery(Sprintf(R"( + ExecuteYqlScript(env, Sprintf(R"( ALTER OBJECT `%s` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_key, TYPE=COUNT_MIN_SKETCH, FEATURES=`{"column_names" : ['Key']}`); - )", fullTableName.c_str())).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )", fullTableName.c_str())); + runtime.SimulateSleep(TDuration::Seconds(1)); - result = session.ExecuteSchemeQuery(Sprintf(R"( + ExecuteYqlScript(env, Sprintf(R"( ALTER OBJECT `%s` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_value, TYPE=COUNT_MIN_SKETCH, FEATURES=`{"column_names" : ['Value']}`); - )", fullTableName.c_str())).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - NYdb::TValueBuilder rows; - rows.BeginList(); + )", fullTableName.c_str())); + runtime.SimulateSleep(TDuration::Seconds(1)); + + using TEvBulkUpsertRequest = NGRpcService::TGrpcRequestOperationCall< + Ydb::Table::BulkUpsertRequest, + Ydb::Table::BulkUpsertResponse>; + + Ydb::Table::BulkUpsertRequest request; + request.set_table(fullTableName); + auto* rows = request.mutable_rows(); + + auto* reqRowType = rows->mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type(); + auto* reqKeyType = reqRowType->add_members(); + reqKeyType->set_name("Key"); + reqKeyType->mutable_type()->set_type_id(Ydb::Type::UINT64); + auto* reqValueType = reqRowType->add_members(); + reqValueType->set_name("Value"); + reqValueType->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64); + + auto* reqRows = rows->mutable_value(); for (size_t i = 0; i < ColumnTableRowsNumber; ++i) { - auto key = TValueBuilder().Uint64(i).Build(); - auto value = TValueBuilder().OptionalUint64(i).Build(); - rows.AddListItem(); - rows.BeginStruct(); - rows.AddMember("Key", key); - rows.AddMember("Value", value); - rows.EndStruct(); + auto* row = reqRows->add_items(); + row->add_items()->set_uint64_value(i); + row->add_items()->set_uint64_value(i); } - rows.EndList(); - result = client.BulkUpsert(fullTableName, rows.Build()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + auto future = NRpcService::DoLocalRpc( + std::move(request), "", "", runtime.GetActorSystem(0)); + auto response = runtime.WaitFuture(std::move(future)); + + UNIT_ASSERT(response.operation().ready()); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); env.GetController()->WaitActualization(TDuration::Seconds(1)); } -std::vector GatherColumnTablesInfo(TTestEnv& env, ui8 tableCount) { +std::vector GatherColumnTablesInfo(TTestEnv& env, const TString& fullDbName, ui8 tableCount) { auto& runtime = *env.GetServer().GetRuntime(); auto sender = runtime.AllocateEdgeActor(); std::vector ret; for (ui8 tableId = 1; tableId <= tableCount; tableId++) { TTableInfo tableInfo; - const TString path = Sprintf("/Root/Database/Table%u", tableId); - tableInfo.ShardIds = GetColumnTableShards(runtime, sender, path); - tableInfo.PathId = ResolvePathId(runtime, path, &tableInfo.DomainKey, &tableInfo.SaTabletId); + tableInfo.Path = Sprintf("%s/Table%u", fullDbName.c_str(), tableId); + tableInfo.ShardIds = GetColumnTableShards(runtime, sender, tableInfo.Path); + tableInfo.PathId = ResolvePathId(runtime, tableInfo.Path, &tableInfo.DomainKey, &tableInfo.SaTabletId); ret.emplace_back(tableInfo); } return ret; } -std::vector CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { - auto init = [&] () { - CreateDatabase(env, "Database"); - for (ui8 tableId = 1; tableId <= tableCount; tableId++) { - CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount); - } - }; - std::thread initThread(init); +TDatabaseInfo CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { + auto fullDbName = CreateDatabase(env, "Database"); - auto& runtime = *env.GetServer().GetRuntime(); - - runtime.SimulateSleep(TDuration::Seconds(10)); - initThread.join(); - - return GatherColumnTablesInfo(env, tableCount); -} + for (ui8 tableId = 1; tableId <= tableCount; tableId++) { + CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount); + } -std::vector CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { - auto init = [&] () { - CreateDatabase(env, "Shared"); + return { + .FullDatabaseName = fullDbName, + .Tables = GatherColumnTablesInfo(env, fullDbName, tableCount) }; - std::thread initThread(init); +} - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); +TDatabaseInfo CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { + auto fullServerlessDbName = CreateDatabase(env, "Shared", 1, true); + auto fullDbName = CreateServerlessDatabase(env, "Database", "/Root/Shared"); - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); + for (ui8 tableId = 1; tableId <= tableCount; tableId++) { + CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount); + } - auto init2 = [&] () { - CreateServerlessDatabase(env, "Database", domainKey); - for (ui8 tableId = 1; tableId <= tableCount; tableId++) { - CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount); - } + return { + .FullDatabaseName = fullServerlessDbName, + .Tables = GatherColumnTablesInfo(env, fullDbName, tableCount) }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); - - return GatherColumnTablesInfo(env, tableCount); } void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { - TTableClient client(env.GetDriver()); - auto session = client.CreateSession().GetValueSync().GetSession(); - - auto result = session.ExecuteSchemeQuery(Sprintf(R"( + ExecuteYqlScript(env, Sprintf(R"( DROP TABLE `Root/%s/%s`; - )", databaseName.c_str(), tableName.c_str())).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )", databaseName.c_str(), tableName.c_str())); } std::shared_ptr ExtractCountMin(TTestActorRuntime& runtime, const TPathId& pathId, ui64 columnTag) { diff --git a/ydb/core/statistics/ut_common/ut_common.h b/ydb/core/statistics/ut_common/ut_common.h index 47132fc87390..df756e53a99c 100644 --- a/ydb/core/statistics/ut_common/ut_common.h +++ b/ydb/core/statistics/ut_common/ut_common.h @@ -16,15 +16,9 @@ namespace NStat { static constexpr ui32 ColumnTableRowsNumber = 1000; -NKikimrSubDomains::TSubDomainSettings GetSubDomainDeclareSettings( - const TString &name, const TStoragePools &pools = {}); - -NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings( - const TString &name, const TStoragePools &pools = {}); - class TTestEnv { public: - TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 1, ui32 storagePools = 1, bool useRealThreads = false); + TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 1, bool useRealThreads = false); ~TTestEnv(); Tests::TServer& GetServer() const { @@ -51,8 +45,6 @@ class TTestEnv { return Settings; } - TStoragePools GetPools() const; - auto& GetController() { return CSController; } @@ -71,18 +63,28 @@ class TTestEnv { NYDBTest::TControllers::TGuard CSController; }; -void CreateDatabase(TTestEnv& env, const TString& databaseName, size_t nodeCount = 1); +Ydb::StatusIds::StatusCode ExecuteYqlScript(TTestEnv& env, const TString& script, bool mustSucceed = true); + +TString CreateDatabase(TTestEnv& env, const TString& databaseName, + size_t nodeCount = 1, bool isShared = false, const TString& poolName = "hdd1"); -void CreateServerlessDatabase(TTestEnv& env, const TString& databaseName, TPathId resourcesDomainKey); +TString CreateServerlessDatabase(TTestEnv& env, const TString& databaseName, const TString& sharedName); struct TTableInfo { std::vector ShardIds; ui64 SaTabletId; TPathId DomainKey; TPathId PathId; + TString Path; }; -std::vector CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount); -std::vector CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount); + +struct TDatabaseInfo { + TString FullDatabaseName; + std::vector Tables; +}; + +TDatabaseInfo CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount); +TDatabaseInfo CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount); TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey = nullptr, ui64* saTabletId = nullptr); diff --git a/ydb/library/table_creator/table_creator.cpp b/ydb/library/table_creator/table_creator.cpp index 2f96883bff9a..7a3b9abe1f1d 100644 --- a/ydb/library/table_creator/table_creator.cpp +++ b/ydb/library/table_creator/table_creator.cpp @@ -35,6 +35,7 @@ using TTableCreatorRetryPolicy = IRetryPolicy; TVector keyColumns, NKikimrServices::EServiceKikimr logService, TMaybe ttlSettings = Nothing(), + const TString& database = {}, bool isSystemUser = false, TMaybe partitioningPolicy = Nothing()) : PathComponents(std::move(pathComponents)) @@ -42,6 +43,7 @@ using TTableCreatorRetryPolicy = IRetryPolicy; , KeyColumns(std::move(keyColumns)) , LogService(logService) , TtlSettings(std::move(ttlSettings)) + , Database(database) , IsSystemUser(isSystemUser) , PartitioningPolicy(std::move(partitioningPolicy)) , LogPrefix("Table " + TableName() + " updater. ") @@ -73,19 +75,22 @@ using TTableCreatorRetryPolicy = IRetryPolicy; void Bootstrap() { Become(&TTableCreator::StateFuncCheck); + if (!Database) { + Database = AppData()->TenantName; + } CheckTableExistence(); } void CheckTableExistence() { Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(NTableCreator::BuildSchemeCacheNavigateRequest( - {PathComponents} + {PathComponents}, Database ).Release()), IEventHandle::FlagTrackDelivery); } void RunTableRequest() { auto request = MakeHolder(); NKikimrSchemeOp::TModifyScheme& modifyScheme = *request->Record.MutableTransaction()->MutableModifyScheme(); - auto pathComponents = SplitPath(AppData()->TenantName); + auto pathComponents = SplitPath(Database); for (size_t i = 0; i < PathComponents.size() - 1; ++i) { pathComponents.emplace_back(PathComponents[i]); } @@ -388,6 +393,7 @@ using TTableCreatorRetryPolicy = IRetryPolicy; const TVector KeyColumns; NKikimrServices::EServiceKikimr LogService; const TMaybe TtlSettings; + TString Database; bool IsSystemUser = false; const TMaybe PartitioningPolicy; NKikimrSchemeOp::EOperationType OperationType = NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable; @@ -422,8 +428,10 @@ THolder BuildSchemeCacheNavigateRequest(cons return request; } -THolder BuildSchemeCacheNavigateRequest(const TVector>& pathsComponents) { - return BuildSchemeCacheNavigateRequest(pathsComponents, AppData()->TenantName, nullptr); +THolder BuildSchemeCacheNavigateRequest( + const TVector>& pathsComponents, const TString& database) +{ + return BuildSchemeCacheNavigateRequest(pathsComponents, database ? database : AppData()->TenantName, nullptr); } NKikimrSchemeOp::TColumnDescription TMultiTableCreator::Col(const TString& columnName, const char* columnType) { @@ -488,12 +496,13 @@ NActors::IActor* CreateTableCreator( TVector keyColumns, NKikimrServices::EServiceKikimr logService, TMaybe ttlSettings, + const TString& database, bool isSystemUser, TMaybe partitioningPolicy) { return new TTableCreator(std::move(pathComponents), std::move(columns), - std::move(keyColumns), logService, std::move(ttlSettings), isSystemUser, - std::move(partitioningPolicy)); + std::move(keyColumns), logService, std::move(ttlSettings), database, + isSystemUser, std::move(partitioningPolicy)); } } // namespace NKikimr diff --git a/ydb/library/table_creator/table_creator.h b/ydb/library/table_creator/table_creator.h index 39e40d428016..c036e4a6cb1a 100644 --- a/ydb/library/table_creator/table_creator.h +++ b/ydb/library/table_creator/table_creator.h @@ -67,7 +67,7 @@ class TMultiTableCreator : public NActors::TActorBootstrapped BuildSchemeCacheNavigateRequest(const TVector>& pathsComponents, const TString& database, TIntrusiveConstPtr userToken); -THolder BuildSchemeCacheNavigateRequest(const TVector>& pathsComponents); +THolder BuildSchemeCacheNavigateRequest(const TVector>& pathsComponents, const TString& database = {}); } // namespace NTableCreator @@ -77,6 +77,7 @@ NActors::IActor* CreateTableCreator( TVector keyColumns, NKikimrServices::EServiceKikimr logService, TMaybe ttlSettings = Nothing(), + const TString& database = {}, bool isSystemUser = false, TMaybe partitioningPolicy = Nothing());