Skip to content

Commit 83f5e7a

Browse files
committed
fix analyze for serverless case
1 parent 93bd594 commit 83f5e7a

File tree

8 files changed

+98
-20
lines changed

8 files changed

+98
-20
lines changed

ydb/core/kqp/gateway/actors/analyze_actor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,8 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&
105105
TStringBuilder() << "Can't get statistics aggregator ID.", {}
106106
)
107107
);
108+
this->Die(ctx);
108109
}
109-
110-
this->Die(ctx);
111110
return;
112111
}
113112

ydb/core/statistics/aggregator/tx_analyze_table_response.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,30 +24,32 @@ struct TStatisticsAggregator::TTxAnalyzeTableResponse : public TTxBase {
2424
const TPathId pathId = PathIdFromPathId(Record.GetPathId());
2525
auto operationTable = Self->ForceTraversalTable(operationId, pathId);
2626
if (!operationTable) {
27-
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown OperationTable. Record: " << Record.ShortDebugString());
27+
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown OperationTable. Record: " << Record.ShortDebugString());
2828
return true;
2929
}
3030

3131
auto analyzedShard = std::find_if(operationTable->AnalyzedShards.begin(), operationTable->AnalyzedShards.end(),
3232
[tabletId = Record.GetShardTabletId()] (TAnalyzedShard& analyzedShard) { return analyzedShard.ShardTabletId == tabletId;});
3333
if (analyzedShard == operationTable->AnalyzedShards.end()) {
34-
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown AnalyzedShards. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
34+
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown AnalyzedShards. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
3535
return true;
3636
}
3737
if (analyzedShard->Status != TAnalyzedShard::EStatus::AnalyzeStarted) {
38-
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Complete. Unknown AnalyzedShards Status. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
38+
SA_LOG_E("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. Unknown AnalyzedShards Status. Record: " << Record.ShortDebugString() << ", ShardTabletId " << Record.GetShardTabletId());
3939
}
4040

4141
analyzedShard->Status = TAnalyzedShard::EStatus::AnalyzeFinished;
4242

4343
bool completeResponse = std::any_of(operationTable->AnalyzedShards.begin(), operationTable->AnalyzedShards.end(),
4444
[] (const TAnalyzedShard& analyzedShard) { return analyzedShard.Status == TAnalyzedShard::EStatus::AnalyzeFinished;});
4545

46-
if (!completeResponse)
46+
if (!completeResponse) {
47+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. There are shards which are not analyzed");
4748
return true;
48-
49+
}
4950
NIceDb::TNiceDb db(txc.DB);
5051
Self->UpdateForceTraversalTableStatus(TForceTraversalTable::EStatus::AnalyzeFinished, operationId, *operationTable, db);
52+
SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTableResponse::Execute. All shards are analyzed");
5153
return true;
5254
}
5355

ydb/core/statistics/aggregator/tx_navigate.cpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,7 @@ struct TStatisticsAggregator::TTxNavigate : public TTxBase {
5959
}
6060

6161
if (Self->TraversalIsColumnTable) {
62-
// TODO: serverless case
63-
if (entry.DomainInfo->Params.HasHive()) {
64-
Self->HiveId = entry.DomainInfo->Params.GetHive();
65-
} else {
66-
Self->HiveId = AppData()->DomainsInfo->GetHive();
67-
}
62+
Self->HiveId = entry.DomainInfo->ExtractHive();
6863
}
6964

7065
return true;

ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
5555
}
5656

5757
bool Execute(TTransactionContext& txc, const TActorContext&) override {
58-
SA_LOG_D("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute");
58+
SA_LOG_D("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Node count = " << HiveRecord.NodesSize());
5959

6060
auto distribution = Self->TabletsForReqDistribution;
6161
for (auto& inNode : HiveRecord.GetNodes()) {
6262
if (inNode.GetNodeId() == 0) {
6363
// these tablets are probably in Hive boot queue
6464
if (Self->HiveRequestRound < Self->MaxHiveRequestRoundCount) {
65+
SA_LOG_W("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Some tablets are probably in Hive boot queue");
6566
Action = EAction::ScheduleReqDistribution;
6667
}
6768
continue;
@@ -76,6 +77,7 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
7677
}
7778

7879
if (!distribution.empty() && Self->ResolveRound < Self->MaxResolveRoundCount) {
80+
SA_LOG_W("[" << Self->TabletID() << "] TTxResponseTabletDistribution::Execute. Some tablets do not exist in Hive anymore; tablet count = " << distribution.size());
7981
// these tablets do not exist in Hive anymore
8082
Self->NavigatePathId = Self->TraversalPathId;
8183
Action = EAction::ScheduleResolve;

ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
1919

2020
AnalyzeTable(runtime, tableInfo.ShardIds[0], tableInfo.PathId);
2121
}
22-
22+
2323
Y_UNIT_TEST(Analyze) {
2424
TTestEnv env(1, 1);
2525
auto& runtime = *env.GetServer().GetRuntime();
@@ -28,6 +28,14 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
2828
Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId});
2929
}
3030

31+
Y_UNIT_TEST(AnalyzeServerless) {
32+
TTestEnv env(1, 1);
33+
auto& runtime = *env.GetServer().GetRuntime();
34+
auto tableInfo = CreateServerlessDatabaseColumnTables(env, 1, 1)[0];
35+
36+
Analyze(runtime, tableInfo.SaTabletId, {tableInfo.PathId});
37+
}
38+
3139
Y_UNIT_TEST(AnalyzeAnalyzeOneColumnTableSpecificColumns) {
3240
TTestEnv env(1, 1);
3341
auto& runtime = *env.GetServer().GetRuntime();

ydb/core/statistics/ut_common/ut_common.cpp

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,36 @@ TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId*
155155
*domainKey = resultEntry.DomainInfo->DomainKey;
156156
}
157157

158-
if (saTabletId && resultEntry.DomainInfo->Params.HasStatisticsAggregator()) {
159-
*saTabletId = resultEntry.DomainInfo->Params.GetStatisticsAggregator();
158+
if (saTabletId) {
159+
if (resultEntry.DomainInfo->Params.HasStatisticsAggregator()) {
160+
*saTabletId = resultEntry.DomainInfo->Params.GetStatisticsAggregator();
161+
} else {
162+
auto resourcesDomainKey = resultEntry.DomainInfo->ResourcesDomainKey;
163+
auto request = std::make_unique<TNavigate>();
164+
auto& entry = request->ResultSet.emplace_back();
165+
entry.TableId = TTableId(resourcesDomainKey.OwnerId, resourcesDomainKey.LocalPathId);
166+
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
167+
entry.Operation = TNavigate::EOp::OpPath;
168+
entry.RedirectRequired = false;
169+
runtime.Send(MakeSchemeCacheID(), sender, new TEvRequest(request.release()));
170+
171+
auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender);
172+
UNIT_ASSERT(ev);
173+
UNIT_ASSERT(ev->Get());
174+
std::unique_ptr<TNavigate> response(ev->Get()->Request.Release());
175+
UNIT_ASSERT(response->ResultSet.size() == 1);
176+
auto& secondResultEntry = response->ResultSet[0];
177+
178+
if (secondResultEntry.DomainInfo->Params.HasStatisticsAggregator()) {
179+
*saTabletId = secondResultEntry.DomainInfo->Params.GetStatisticsAggregator();
180+
}
181+
}
160182
}
161183

162184
return resultEntry.TableId.PathId;
163185
}
164186

165-
NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString &path)
187+
NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString& path)
166188
{
167189
TAutoPtr<IEventHandle> handle;
168190

@@ -175,7 +197,7 @@ NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime,
175197
return *reply->MutableRecord();
176198
}
177199

178-
TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString &path)
200+
TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path)
179201
{
180202
TVector<ui64> shards;
181203
auto lsResult = DescribeTable(runtime, sender, path);
@@ -185,7 +207,7 @@ TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const
185207
return shards;
186208
}
187209

188-
TVector<ui64> GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender,const TString &path)
210+
TVector<ui64> GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path)
189211
{
190212
TVector<ui64> shards;
191213
auto lsResult = DescribeTable(runtime, sender, path);
@@ -301,6 +323,42 @@ std::vector<TTableInfo> CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount
301323
return ret;
302324
}
303325

326+
std::vector<TTableInfo> CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) {
327+
auto init = [&] () {
328+
CreateDatabase(env, "Shared");
329+
};
330+
std::thread initThread(init);
331+
332+
auto& runtime = *env.GetServer().GetRuntime();
333+
runtime.SimulateSleep(TDuration::Seconds(5));
334+
initThread.join();
335+
336+
TPathId domainKey;
337+
ResolvePathId(runtime, "/Root/Shared", &domainKey);
338+
339+
auto init2 = [&] () {
340+
CreateServerlessDatabase(env, "Serverless", domainKey);
341+
for (ui8 tableId = 1; tableId <= tableCount; tableId++) {
342+
CreateColumnStoreTable(env, "Serverless", Sprintf("Table%u", tableId), shardCount);
343+
}
344+
};
345+
std::thread init2Thread(init2);
346+
347+
runtime.SimulateSleep(TDuration::Seconds(5));
348+
init2Thread.join();
349+
350+
auto sender = runtime.AllocateEdgeActor();
351+
std::vector<TTableInfo> ret;
352+
for (ui8 tableId = 1; tableId <= tableCount; tableId++) {
353+
TTableInfo tableInfo;
354+
const TString path = Sprintf("/Root/Serverless/Table%u", tableId);
355+
tableInfo.ShardIds = GetColumnTableShards(runtime, sender, path);
356+
tableInfo.PathId = ResolvePathId(runtime, path, &tableInfo.DomainKey, &tableInfo.SaTabletId);
357+
ret.emplace_back(tableInfo);
358+
}
359+
return ret;
360+
}
361+
304362
void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableName) {
305363
TTableClient client(env.GetDriver());
306364
auto session = client.CreateSession().GetValueSync().GetSession();

ydb/core/statistics/ut_common/ut_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ struct TTableInfo {
8282
TPathId PathId;
8383
};
8484
std::vector<TTableInfo> CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount);
85+
std::vector<TTableInfo> CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount);
8586

8687
TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey = nullptr, ui64* saTabletId = nullptr);
8788

ydb/core/tx/scheme_cache/scheme_cache.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
6666
if (descr.HasServerlessComputeResourcesMode()) {
6767
ServerlessComputeResourcesMode = descr.GetServerlessComputeResourcesMode();
6868
}
69+
70+
if (descr.HasSharedHive()) {
71+
SharedHiveId = descr.GetSharedHive();
72+
}
6973
}
7074

7175
inline ui64 GetVersion() const {
@@ -80,6 +84,14 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
8084
}
8185
}
8286

87+
inline ui64 ExtractHive() const {
88+
if (IsServerless()) {
89+
return SharedHiveId;
90+
} else {
91+
return Params.GetHive();
92+
}
93+
}
94+
8395
inline bool IsServerless() const {
8496
return DomainKey != ResourcesDomainKey;
8597
}
@@ -89,6 +101,7 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
89101
NKikimrSubDomains::TProcessingParams Params;
90102
TCoordinators Coordinators;
91103
TMaybeServerlessComputeResourcesMode ServerlessComputeResourcesMode;
104+
ui64 SharedHiveId = 0;
92105

93106
TString ToString() const;
94107

0 commit comments

Comments
 (0)