Skip to content

Commit b83f29e

Browse files
authored
retry requests to statistics table (#8358)
1 parent 9616f55 commit b83f29e

File tree

6 files changed

+173
-27
lines changed

6 files changed

+173
-27
lines changed

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,8 +577,8 @@ void TStatisticsAggregator::SaveStatisticsToTable() {
577577
data.push_back(strSketch);
578578
}
579579

580-
Register(CreateSaveStatisticsQuery(TraversalPathId, EStatType::COUNT_MIN_SKETCH,
581-
std::move(columnTags), std::move(data)));
580+
Register(CreateSaveStatisticsQuery(SelfId(),
581+
TraversalPathId, EStatType::COUNT_MIN_SKETCH, std::move(columnTags), std::move(data)));
582582
}
583583

584584
void TStatisticsAggregator::DeleteStatisticsFromTable() {
@@ -589,7 +589,7 @@ void TStatisticsAggregator::DeleteStatisticsFromTable() {
589589

590590
PendingDeleteStatistics = false;
591591

592-
Register(CreateDeleteStatisticsQuery(TraversalPathId));
592+
Register(CreateDeleteStatisticsQuery(SelfId(), TraversalPathId));
593593
}
594594

595595
void TStatisticsAggregator::ScheduleNextAnalyze(NIceDb::TNiceDb& db) {

ydb/core/statistics/database/database.cpp

Lines changed: 143 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,15 @@ class TSaveStatisticsQuery : public NKikimr::TQueryBase {
8181
const ui64 StatType;
8282
const std::vector<ui32> ColumnTags;
8383
const std::vector<TString> Data;
84+
8485
public:
8586
TSaveStatisticsQuery(const TPathId& pathId, ui64 statType,
86-
std::vector<ui32>&& columnTags, std::vector<TString>&& data)
87+
const std::vector<ui32>& columnTags, const std::vector<TString>& data)
8788
: NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, {}, true)
8889
, PathId(pathId)
8990
, StatType(statType)
90-
, ColumnTags(std::move(columnTags))
91-
, Data(std::move(data))
91+
, ColumnTags(columnTags)
92+
, Data(data)
9293
{
9394
Y_ABORT_UNLESS(ColumnTags.size() == Data.size());
9495
}
@@ -148,15 +149,61 @@ class TSaveStatisticsQuery : public NKikimr::TQueryBase {
148149
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
149150
Y_UNUSED(issues);
150151
auto response = std::make_unique<TEvStatistics::TEvSaveStatisticsQueryResponse>();
152+
response->Status = status;
153+
response->Issues = std::move(issues);
151154
response->Success = (status == Ydb::StatusIds::SUCCESS);
152155
Send(Owner, response.release());
153156
}
154157
};
155158

156-
NActors::IActor* CreateSaveStatisticsQuery(const TPathId& pathId, ui64 statType,
157-
std::vector<ui32>&& columnTags, std::vector<TString>&& data)
159+
class TSaveStatisticsRetryingQuery : public TActorBootstrapped<TSaveStatisticsRetryingQuery> {
160+
private:
161+
const NActors::TActorId ReplyActorId;
162+
const TPathId PathId;
163+
const ui64 StatType;
164+
const std::vector<ui32> ColumnTags;
165+
const std::vector<TString> Data;
166+
167+
public:
168+
using TSaveRetryingQuery = TQueryRetryActor<
169+
TSaveStatisticsQuery, TEvStatistics::TEvSaveStatisticsQueryResponse,
170+
const TPathId&, ui64, const std::vector<ui32>&, const std::vector<TString>&>;
171+
172+
TSaveStatisticsRetryingQuery(const NActors::TActorId& replyActorId,
173+
const TPathId& pathId, ui64 statType, std::vector<ui32>&& columnTags, std::vector<TString>&& data)
174+
: ReplyActorId(replyActorId)
175+
, PathId(pathId)
176+
, StatType(statType)
177+
, ColumnTags(std::move(columnTags))
178+
, Data(std::move(data))
179+
{}
180+
181+
void Bootstrap() {
182+
Register(new TSaveRetryingQuery(
183+
SelfId(),
184+
TSaveRetryingQuery::IRetryPolicy::GetExponentialBackoffPolicy(
185+
TSaveRetryingQuery::Retryable, TDuration::MilliSeconds(10),
186+
TDuration::MilliSeconds(200), TDuration::Seconds(1),
187+
std::numeric_limits<size_t>::max(), TDuration::Seconds(1)),
188+
PathId, StatType, ColumnTags, Data
189+
));
190+
Become(&TSaveStatisticsRetryingQuery::StateFunc);
191+
}
192+
193+
STRICT_STFUNC(StateFunc,
194+
hFunc(TEvStatistics::TEvSaveStatisticsQueryResponse, Handle);
195+
)
196+
197+
void Handle(TEvStatistics::TEvSaveStatisticsQueryResponse::TPtr& ev) {
198+
Send(ReplyActorId, ev->Release().Release());
199+
PassAway();
200+
}
201+
};
202+
203+
NActors::IActor* CreateSaveStatisticsQuery(const NActors::TActorId& replyActorId,
204+
const TPathId& pathId, ui64 statType, std::vector<ui32>&& columnTags, std::vector<TString>&& data)
158205
{
159-
return new TSaveStatisticsQuery(pathId, statType, std::move(columnTags), std::move(data));
206+
return new TSaveStatisticsRetryingQuery(replyActorId, pathId, statType, std::move(columnTags), std::move(data));
160207
}
161208

162209

@@ -231,6 +278,8 @@ class TLoadStatisticsQuery : public NKikimr::TQueryBase {
231278
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
232279
Y_UNUSED(issues);
233280
auto response = std::make_unique<TEvStatistics::TEvLoadStatisticsQueryResponse>();
281+
response->Status = status;
282+
response->Issues = std::move(issues);
234283
response->Success = (status == Ydb::StatusIds::SUCCESS);
235284
response->Cookie = Cookie;
236285
if (response->Success) {
@@ -240,10 +289,54 @@ class TLoadStatisticsQuery : public NKikimr::TQueryBase {
240289
}
241290
};
242291

243-
NActors::IActor* CreateLoadStatisticsQuery(const TPathId& pathId, ui64 statType,
244-
ui32 columnTag, ui64 cookie)
292+
class TLoadStatisticsRetryingQuery : public TActorBootstrapped<TLoadStatisticsRetryingQuery> {
293+
private:
294+
const NActors::TActorId ReplyActorId;
295+
const TPathId PathId;
296+
const ui64 StatType;
297+
const ui32 ColumnTag;
298+
const ui64 Cookie;
299+
300+
public:
301+
using TLoadRetryingQuery = TQueryRetryActor<
302+
TLoadStatisticsQuery, TEvStatistics::TEvLoadStatisticsQueryResponse,
303+
const TPathId&, ui64, ui32, ui64>;
304+
305+
TLoadStatisticsRetryingQuery(const NActors::TActorId& replyActorId,
306+
const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie)
307+
: ReplyActorId(replyActorId)
308+
, PathId(pathId)
309+
, StatType(statType)
310+
, ColumnTag(columnTag)
311+
, Cookie(cookie)
312+
{}
313+
314+
void Bootstrap() {
315+
Register(new TLoadRetryingQuery(
316+
SelfId(),
317+
TLoadRetryingQuery::IRetryPolicy::GetExponentialBackoffPolicy(
318+
TLoadRetryingQuery::Retryable, TDuration::MilliSeconds(10),
319+
TDuration::MilliSeconds(200), TDuration::Seconds(1),
320+
std::numeric_limits<size_t>::max(), TDuration::Seconds(1)),
321+
PathId, StatType, ColumnTag, Cookie
322+
));
323+
Become(&TLoadStatisticsRetryingQuery::StateFunc);
324+
}
325+
326+
STRICT_STFUNC(StateFunc,
327+
hFunc(TEvStatistics::TEvLoadStatisticsQueryResponse, Handle);
328+
)
329+
330+
void Handle(TEvStatistics::TEvLoadStatisticsQueryResponse::TPtr& ev) {
331+
Send(ReplyActorId, ev->Release().Release());
332+
PassAway();
333+
}
334+
};
335+
336+
NActors::IActor* CreateLoadStatisticsQuery(const NActors::TActorId& replyActorId,
337+
const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie)
245338
{
246-
return new TLoadStatisticsQuery(pathId, statType, columnTag, cookie);
339+
return new TLoadStatisticsRetryingQuery(replyActorId, pathId, statType, columnTag, cookie);
247340
}
248341

249342

@@ -288,14 +381,53 @@ class TDeleteStatisticsQuery : public NKikimr::TQueryBase {
288381
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
289382
Y_UNUSED(issues);
290383
auto response = std::make_unique<TEvStatistics::TEvDeleteStatisticsQueryResponse>();
384+
response->Status = status;
385+
response->Issues = std::move(issues);
291386
response->Success = (status == Ydb::StatusIds::SUCCESS);
292387
Send(Owner, response.release());
293388
}
294389
};
295390

296-
NActors::IActor* CreateDeleteStatisticsQuery(const TPathId& pathId)
391+
class TDeleteStatisticsRetryingQuery : public TActorBootstrapped<TDeleteStatisticsRetryingQuery> {
392+
private:
393+
const NActors::TActorId ReplyActorId;
394+
const TPathId PathId;
395+
396+
public:
397+
using TDeleteRetryingQuery = TQueryRetryActor<
398+
TDeleteStatisticsQuery, TEvStatistics::TEvDeleteStatisticsQueryResponse,
399+
const TPathId&>;
400+
401+
TDeleteStatisticsRetryingQuery(const NActors::TActorId& replyActorId, const TPathId& pathId)
402+
: ReplyActorId(replyActorId)
403+
, PathId(pathId)
404+
{}
405+
406+
void Bootstrap() {
407+
Register(new TDeleteRetryingQuery(
408+
SelfId(),
409+
TDeleteRetryingQuery::IRetryPolicy::GetExponentialBackoffPolicy(
410+
TDeleteRetryingQuery::Retryable, TDuration::MilliSeconds(10),
411+
TDuration::MilliSeconds(200), TDuration::Seconds(1),
412+
std::numeric_limits<size_t>::max(), TDuration::Seconds(1)),
413+
PathId
414+
));
415+
Become(&TDeleteStatisticsRetryingQuery::StateFunc);
416+
}
417+
418+
STRICT_STFUNC(StateFunc,
419+
hFunc(TEvStatistics::TEvDeleteStatisticsQueryResponse, Handle);
420+
)
421+
422+
void Handle(TEvStatistics::TEvDeleteStatisticsQueryResponse::TPtr& ev) {
423+
Send(ReplyActorId, ev->Release().Release());
424+
PassAway();
425+
}
426+
};
427+
428+
NActors::IActor* CreateDeleteStatisticsQuery(const NActors::TActorId& replyActorId, const TPathId& pathId)
297429
{
298-
return new TDeleteStatisticsQuery(pathId);
430+
return new TDeleteStatisticsRetryingQuery(replyActorId, pathId);
299431
}
300432

301433
} // NKikimr::NStat

ydb/core/statistics/database/database.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ namespace NKikimr::NStat {
77

88
NActors::IActor* CreateStatisticsTableCreator(std::unique_ptr<NActors::IEventBase> event);
99

10-
NActors::IActor* CreateSaveStatisticsQuery(const TPathId& pathId, ui64 statType,
11-
std::vector<ui32>&& columnTags, std::vector<TString>&& data);
10+
NActors::IActor* CreateSaveStatisticsQuery(const NActors::TActorId& replyActorId,
11+
const TPathId& pathId, ui64 statType, std::vector<ui32>&& columnTags, std::vector<TString>&& data);
1212

13-
NActors::IActor* CreateLoadStatisticsQuery(const TPathId& pathId, ui64 statType,
14-
ui32 columnTag, ui64 cookie);
13+
NActors::IActor* CreateLoadStatisticsQuery(const NActors::TActorId& replyActorId,
14+
const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie);
1515

16-
NActors::IActor* CreateDeleteStatisticsQuery(const TPathId& pathId);
16+
NActors::IActor* CreateDeleteStatisticsQuery(const NActors::TActorId& replyActorId, const TPathId& pathId);
1717

1818
};

ydb/core/statistics/database/ut/ut_database.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,20 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) {
3131
std::vector<ui32> columnTags = {1, 2};
3232
std::vector<TString> data = {"dataA", "dataB"};
3333

34-
runtime.Register(CreateSaveStatisticsQuery(
34+
runtime.Register(CreateSaveStatisticsQuery(sender,
3535
pathId, statType, std::move(columnTags), std::move(data)),
3636
0, 0, TMailboxType::Simple, 0, sender);
3737
auto saveResponse = runtime.GrabEdgeEvent<TEvStatistics::TEvSaveStatisticsQueryResponse>(sender);
3838
UNIT_ASSERT(saveResponse->Get()->Success);
3939

40-
runtime.Register(CreateLoadStatisticsQuery(pathId, statType, 1, 1),
40+
runtime.Register(CreateLoadStatisticsQuery(sender, pathId, statType, 1, 1),
4141
0, 0, TMailboxType::Simple, 0, sender);
4242
auto loadResponseA = runtime.GrabEdgeEvent<TEvStatistics::TEvLoadStatisticsQueryResponse>(sender);
4343
UNIT_ASSERT(loadResponseA->Get()->Success);
4444
UNIT_ASSERT(loadResponseA->Get()->Data);
4545
UNIT_ASSERT_VALUES_EQUAL(*loadResponseA->Get()->Data, "dataA");
4646

47-
runtime.Register(CreateLoadStatisticsQuery(pathId, statType, 2, 1),
47+
runtime.Register(CreateLoadStatisticsQuery(sender, pathId, statType, 2, 1),
4848
0, 0, TMailboxType::Simple, 0, sender);
4949
auto loadResponseB = runtime.GrabEdgeEvent<TEvStatistics::TEvLoadStatisticsQueryResponse>(sender);
5050
UNIT_ASSERT(loadResponseB->Get()->Success);
@@ -73,18 +73,18 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) {
7373
std::vector<ui32> columnTags = {1, 2};
7474
std::vector<TString> data = {"dataA", "dataB"};
7575

76-
runtime.Register(CreateSaveStatisticsQuery(
76+
runtime.Register(CreateSaveStatisticsQuery(sender,
7777
pathId, statType, std::move(columnTags), std::move(data)),
7878
0, 0, TMailboxType::Simple, 0, sender);
7979
auto saveResponse = runtime.GrabEdgeEvent<TEvStatistics::TEvSaveStatisticsQueryResponse>(sender);
8080
UNIT_ASSERT(saveResponse->Get()->Success);
8181

82-
runtime.Register(CreateDeleteStatisticsQuery(pathId),
82+
runtime.Register(CreateDeleteStatisticsQuery(sender, pathId),
8383
0, 0, TMailboxType::Simple, 0, sender);
8484
auto deleteResponse = runtime.GrabEdgeEvent<TEvStatistics::TEvDeleteStatisticsQueryResponse>(sender);
8585
UNIT_ASSERT(deleteResponse->Get()->Success);
8686

87-
runtime.Register(CreateLoadStatisticsQuery(pathId, statType, 1, 1),
87+
runtime.Register(CreateLoadStatisticsQuery(sender, pathId, statType, 1, 1),
8888
0, 0, TMailboxType::Simple, 0, sender);
8989
auto loadResponseA = runtime.GrabEdgeEvent<TEvStatistics::TEvLoadStatisticsQueryResponse>(sender);
9090
UNIT_ASSERT(!loadResponseA->Get()->Success);

ydb/core/statistics/events.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
#include <ydb/core/base/events.h>
44
#include <ydb/core/scheme/scheme_pathid.h>
55
#include <ydb/core/protos/statistics.pb.h>
6+
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
67
#include <ydb/library/minsketch/count_min_sketch.h>
78
#include <ydb/library/actors/core/events.h>
9+
#include <ydb/library/yql/public/issue/yql_issue.h>
10+
811

912
namespace NKikimr {
1013
namespace NStat {
@@ -187,13 +190,17 @@ struct TEvStatistics {
187190
TEvSaveStatisticsQueryResponse,
188191
EvSaveStatisticsQueryResponse>
189192
{
193+
Ydb::StatusIds::StatusCode Status;
194+
NYql::TIssues Issues;
190195
bool Success = true;
191196
};
192197

193198
struct TEvLoadStatisticsQueryResponse : public TEventLocal<
194199
TEvLoadStatisticsQueryResponse,
195200
EvLoadStatisticsQueryResponse>
196201
{
202+
Ydb::StatusIds::StatusCode Status;
203+
NYql::TIssues Issues;
197204
bool Success = true;
198205
ui64 Cookie = 0;
199206
std::optional<TString> Data;
@@ -203,6 +210,8 @@ struct TEvStatistics {
203210
TEvDeleteStatisticsQueryResponse,
204211
EvDeleteStatisticsQueryResponse>
205212
{
213+
Ydb::StatusIds::StatusCode Status;
214+
NYql::TIssues Issues;
206215
bool Success = true;
207216
};
208217

ydb/core/statistics/service/service_impl.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -678,10 +678,15 @@ class TStatService : public TActorBootstrapped<TStatService> {
678678
for (const auto& req : request.StatRequests) {
679679
auto& response = request.StatResponses.emplace_back();
680680
response.Req = req;
681+
if (!req.ColumnTag) {
682+
response.Success = false;
683+
++reqIndex;
684+
continue;
685+
}
681686
ui64 loadCookie = NextLoadQueryCookie++;
682687
LoadQueriesInFlight[loadCookie] = std::make_pair(requestId, reqIndex);
683-
Register(CreateLoadStatisticsQuery(req.PathId, request.StatType,
684-
*req.ColumnTag, loadCookie));
688+
Register(CreateLoadStatisticsQuery(SelfId(),
689+
req.PathId, request.StatType, *req.ColumnTag, loadCookie));
685690
++request.ReplyCounter;
686691
++reqIndex;
687692
}

0 commit comments

Comments
 (0)