Skip to content

Commit ac22cb5

Browse files
authored
fix analyze for serverless case (#8843)
1 parent 8ffabfb commit ac22cb5

File tree

8 files changed

+106
-28
lines changed

8 files changed

+106
-28
lines changed

ydb/core/kqp/gateway/actors/analyze_actor.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,8 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&
105105
TStringBuilder() << "Can't get statistics aggregator ID.", {}
106106
)
107107
);
108+
this->Die(ctx);
108109
}
109-
110-
this->Die(ctx);
111110
return;
112111
}
113112

ydb/core/statistics/aggregator/tx_analyze_table_response.cpp

+7-5
Original file line numberDiff line numberDiff line change
@@ -24,30 +24,32 @@ struct TStatisticsAggregator::TTxAnalyzeTableResponse : public TTxBase {
2424
const TPathId pathId = PathIdFromPathId(Record.GetPathId());
2525
auto operationTable = Self->ForceTraversalTable(operationId, pathId);
2626
if (!operationTable) {
27-
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown OperationTable. Record: " << Record.ShortDebugString());
27+
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown OperationTable. Record: " << Record.ShortDebugString());
2828
return true;
2929
}
3030

3131
auto analyzedShard = std::find_if(operationTable->AnalyzedShards.begin(), operationTable->AnalyzedShards.end(),
3232
[tabletId = Record.GetShardTabletId()] (TAnalyzedShard& analyzedShard) { return analyzedShard.ShardTabletId == tabletId;});
3333
if (analyzedShard == operationTable->AnalyzedShards.end()) {
34-
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown AnalyzedShards. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
34+
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown AnalyzedShards. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
3535
return true;
3636
}
3737
if (analyzedShard->Status != TAnalyzedShard::EStatus::AnalyzeStarted) {
38-
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown AnalyzedShards Status. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
38+
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown AnalyzedShards Status. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
3939
}
4040

4141
analyzedShard->Status = TAnalyzedShard::EStatus::AnalyzeFinished;
4242

4343
bool completeResponse = std::any_of(operationTable->AnalyzedShards.begin(), operationTable->AnalyzedShards.end(),
4444
[] (const TAnalyzedShard& analyzedShard) { return analyzedShard.Status == TAnalyzedShard::EStatus::AnalyzeFinished;});
4545

46-
if (!completeResponse)
46+
if (!completeResponse) {
47+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. There are shards which are not analyzed");
4748
return true;
48-
49+
}
4950
NIceDb::TNiceDb db(txc.DB);
5051
Self->UpdateForceTraversalTableStatus(TForceTraversalTable::EStatus::AnalyzeFinished, operationId, *operationTable, db);
52+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. All shards are analyzed");
5153
return true;
5254
}
5355

ydb/core/statistics/aggregator/tx_navigate.cpp

+2-4
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,8 @@ struct TStatisticsAggregator::TTxNavigate : public TTxBase {
5959
}
6060

6161
if (Self->TraversalIsColumnTable) {
62-
// TODO: serverless case
63-
if (entry.DomainInfo->Params.HasHive()) {
64-
Self->HiveId = entry.DomainInfo->Params.GetHive();
65-
} else {
62+
Self->HiveId = entry.DomainInfo->ExtractHive();
63+
if (Self->HiveId == 0) {
6664
Self->HiveId = AppData()->DomainsInfo->GetHive();
6765
}
6866
}

ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
5555
}
5656

5757
bool Execute(TTransactionContext& txc, const TActorContext&) override {
58-
SA_LOG_D("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute");
58+
SA_LOG_D("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Node count = " << HiveRecord.NodesSize());
5959

6060
auto distribution = Self->TabletsForReqDistribution;
6161
for (auto& inNode : HiveRecord.GetNodes()) {
6262
if (inNode.GetNodeId() == 0) {
6363
// these tablets are probably in Hive boot queue
6464
if (Self->HiveRequestRound < Self->MaxHiveRequestRoundCount) {
65+
SA_LOG_W("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Some tablets are probably in Hive boot queue");
6566
Action = EAction::ScheduleReqDistribution;
6667
}
6768
continue;
@@ -76,6 +77,7 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
7677
}
7778

7879
if (!distribution.empty() && Self->ResolveRound < Self->MaxResolveRoundCount) {
80+
SA_LOG_W("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Some tablets do not exist in Hive anymore; tablet count = " << distribution.size());
7981
// these tablets do not exist in Hive anymore
8082
Self->NavigatePathId = Self->TraversalPathId;
8183
Action = EAction::ScheduleResolve;

ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp

+9-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
1919

2020
AnalyzeTable(runtime, tableInfo.ShardIds[0], tableInfo.PathId);
2121
}
22-
22+
2323
Y_UNIT_TEST(Analyze) {
2424
TTestEnv env(1, 1);
2525
auto& runtime = *env.GetServer().GetRuntime();
@@ -28,6 +28,14 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
2828
Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId});
2929
}
3030

31+
Y_UNIT_TEST(AnalyzeServerless) {
32+
TTestEnv env(1, 1);
33+
auto& runtime = *env.GetServer().GetRuntime();
34+
auto tableInfo = CreateServerlessDatabaseColumnTables(env, 1, 1)[0];
35+
36+
Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId});
37+
}
38+
3139
Y_UNIT_TEST(AnalyzeAnalyzeOneColumnTableSpecificColumns) {
3240
TTestEnv env(1, 1);
3341
auto& runtime = *env.GetServer().GetRuntime();

ydb/core/statistics/ut_common/ut_common.cpp

+70-15
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,36 @@ TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId*
155155
*domainKey = resultEntry.DomainInfo->DomainKey;
156156
}
157157

158-
if (saTabletId && resultEntry.DomainInfo->Params.HasStatisticsAggregator()) {
159-
*saTabletId = resultEntry.DomainInfo->Params.GetStatisticsAggregator();
158+
if (saTabletId) {
159+
if (resultEntry.DomainInfo->Params.HasStatisticsAggregator()) {
160+
*saTabletId = resultEntry.DomainInfo->Params.GetStatisticsAggregator();
161+
} else {
162+
auto resourcesDomainKey = resultEntry.DomainInfo->ResourcesDomainKey;
163+
auto request = std::make_unique<TNavigate>();
164+
auto& entry = request->ResultSet.emplace_back();
165+
entry.TableId = TTableId(resourcesDomainKey.OwnerId, resourcesDomainKey.LocalPathId);
166+
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
167+
entry.Operation = TNavigate::EOp::OpPath;
168+
entry.RedirectRequired = false;
169+
runtime.Send(MakeSchemeCacheID(), sender, new TEvRequest(request.release()));
170+
171+
auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender);
172+
UNIT_ASSERT(ev);
173+
UNIT_ASSERT(ev->Get());
174+
std::unique_ptr<TNavigate> response(ev->Get()->Request.Release());
175+
UNIT_ASSERT(response->ResultSet.size() == 1);
176+
auto& secondResultEntry = response->ResultSet[0];
177+
178+
if (secondResultEntry.DomainInfo->Params.HasStatisticsAggregator()) {
179+
*saTabletId = secondResultEntry.DomainInfo->Params.GetStatisticsAggregator();
180+
}
181+
}
160182
}
161183

162184
return resultEntry.TableId.PathId;
163185
}
164186

165-
NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString &path)
187+
NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString& path)
166188
{
167189
TAutoPtr<IEventHandle> handle;
168190

@@ -175,7 +197,7 @@ NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime,
175197
return *reply->MutableRecord();
176198
}
177199

178-
TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString &path)
200+
TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path)
179201
{
180202
TVector<ui64> shards;
181203
auto lsResult = DescribeTable(runtime, sender, path);
@@ -185,7 +207,7 @@ TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const
185207
return shards;
186208
}
187209

188-
TVector<ui64> GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender,const TString &path)
210+
TVector<ui64> GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path)
189211
{
190212
TVector<ui64> shards;
191213
auto lsResult = DescribeTable(runtime, sender, path);
@@ -275,6 +297,21 @@ void CreateColumnStoreTable(TTestEnv& env, const TString& databaseName, const TS
275297
env.GetController()->WaitActualization(TDuration::Seconds(1));
276298
}
277299

300+
std::vector<TTableInfo> GatherColumnTablesInfo(TTestEnv& env, ui8 tableCount) {
301+
auto& runtime = *env.GetServer().GetRuntime();
302+
auto sender = runtime.AllocateEdgeActor();
303+
304+
std::vector<TTableInfo> ret;
305+
for (ui8 tableId = 1; tableId <= tableCount; tableId++) {
306+
TTableInfo tableInfo;
307+
const TString path = Sprintf("/Root/Database/Table%u", tableId);
308+
tableInfo.ShardIds = GetColumnTableShards(runtime, sender, path);
309+
tableInfo.PathId = ResolvePathId(runtime, path, &tableInfo.DomainKey, &tableInfo.SaTabletId);
310+
ret.emplace_back(tableInfo);
311+
}
312+
return ret;
313+
}
314+
278315
std::vector<TTableInfo> CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) {
279316
auto init = [&] () {
280317
CreateDatabase(env, "Database");
@@ -285,20 +322,38 @@ std::vector<TTableInfo> CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount
285322
std::thread initThread(init);
286323

287324
auto& runtime = *env.GetServer().GetRuntime();
288-
auto sender = runtime.AllocateEdgeActor();
289325

290326
runtime.SimulateSleep(TDuration::Seconds(10));
291327
initThread.join();
292328

293-
std::vector<TTableInfo> ret;
294-
for (ui8 tableId = 1; tableId <= tableCount; tableId++) {
295-
TTableInfo tableInfo;
296-
const TString path = Sprintf("/Root/Database/Table%u", tableId);
297-
tableInfo.ShardIds = GetColumnTableShards(runtime, sender, path);
298-
tableInfo.PathId = ResolvePathId(runtime, path, &tableInfo.DomainKey, &tableInfo.SaTabletId);
299-
ret.emplace_back(tableInfo);
300-
}
301-
return ret;
329+
return GatherColumnTablesInfo(env, tableCount);
330+
}
331+
332+
std::vector<TTableInfo> CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) {
333+
auto init = [&] () {
334+
CreateDatabase(env, "Shared");
335+
};
336+
std::thread initThread(init);
337+
338+
auto& runtime = *env.GetServer().GetRuntime();
339+
runtime.SimulateSleep(TDuration::Seconds(5));
340+
initThread.join();
341+
342+
TPathId domainKey;
343+
ResolvePathId(runtime, "/Root/Shared", &domainKey);
344+
345+
auto init2 = [&] () {
346+
CreateServerlessDatabase(env, "Database", domainKey);
347+
for (ui8 tableId = 1; tableId <= tableCount; tableId++) {
348+
CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount);
349+
}
350+
};
351+
std::thread init2Thread(init2);
352+
353+
runtime.SimulateSleep(TDuration::Seconds(5));
354+
init2Thread.join();
355+
356+
return GatherColumnTablesInfo(env, tableCount);
302357
}
303358

304359
void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableName) {

ydb/core/statistics/ut_common/ut_common.h

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ struct TTableInfo {
8282
TPathId PathId;
8383
};
8484
std::vector<TTableInfo> CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount);
85+
std::vector<TTableInfo> CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount);
8586

8687
TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey = nullptr, ui64* saTabletId = nullptr);
8788

ydb/core/tx/scheme_cache/scheme_cache.h

+13
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
6666
if (descr.HasServerlessComputeResourcesMode()) {
6767
ServerlessComputeResourcesMode = descr.GetServerlessComputeResourcesMode();
6868
}
69+
70+
if (descr.HasSharedHive()) {
71+
SharedHiveId = descr.GetSharedHive();
72+
}
6973
}
7074

7175
inline ui64 GetVersion() const {
@@ -80,6 +84,14 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
8084
}
8185
}
8286

87+
inline ui64 ExtractHive() const {
88+
if (IsServerless()) {
89+
return SharedHiveId;
90+
} else {
91+
return Params.GetHive();
92+
}
93+
}
94+
8395
inline bool IsServerless() const {
8496
return DomainKey != ResourcesDomainKey;
8597
}
@@ -89,6 +101,7 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
89101
NKikimrSubDomains::TProcessingParams Params;
90102
TCoordinators Coordinators;
91103
TMaybeServerlessComputeResourcesMode ServerlessComputeResourcesMode;
104+
ui64 SharedHiveId = 0;
92105

93106
TString ToString() const;
94107

0 commit comments

Comments
 (0)