Skip to content

Commit eb1276a

Browse files
authored
[KQP] ANALYZE retries has been added. (#8115)
1 parent ef8014a commit eb1276a

File tree

2 files changed

+70
-69
lines changed

2 files changed

+70
-69
lines changed

ydb/core/kqp/gateway/actors/analyze_actor.cpp

Lines changed: 51 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include "analyze_actor.h"
22

33
#include <ydb/core/base/path.h>
4-
#include <ydb/core/base/tablet_pipecache.h>
54
#include <ydb/core/util/ulid.h>
65
#include <ydb/library/actors/core/log.h>
76
#include <ydb/library/services/services.pb.h>
@@ -42,19 +41,6 @@ void TAnalyzeActor::Bootstrap() {
4241
Become(&TAnalyzeActor::StateWork);
4342
}
4443

45-
void TAnalyzeActor::SendAnalyzeStatus() {
46-
Y_ABORT_UNLESS(StatisticsAggregatorId.has_value());
47-
48-
auto getStatus = std::make_unique<NStat::TEvStatistics::TEvAnalyzeStatus>();
49-
auto& record = getStatus->Record;
50-
record.SetOperationId(OperationId);
51-
52-
Send(
53-
MakePipePerNodeCacheID(false),
54-
new TEvPipeCache::TEvForward(getStatus.release(), StatisticsAggregatorId.value(), true)
55-
);
56-
}
57-
5844
void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx) {
5945
Y_UNUSED(ctx);
6046

@@ -67,50 +53,10 @@ void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, c
6753
<< " , but expected " << OperationId);
6854
}
6955

70-
71-
// TODO Don't send EvAnalyzeStatus, EvAnalyzeResponse is already here
72-
SendAnalyzeStatus();
73-
}
74-
75-
void TAnalyzeActor::Handle(TEvAnalyzePrivate::TEvAnalyzeStatusCheck::TPtr& ev, const TActorContext& ctx) {
76-
Y_UNUSED(ev);
77-
Y_UNUSED(ctx);
78-
79-
SendAnalyzeStatus();
80-
}
81-
82-
void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev, const TActorContext& ctx) {
83-
auto& record = ev->Get()->Record;
84-
switch (record.GetStatus()) {
85-
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_UNSPECIFIED: {
86-
Promise.SetValue(
87-
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
88-
YqlIssue(
89-
{}, NYql::TIssuesIds::UNEXPECTED,
90-
TStringBuilder() << "Statistics Aggregator unspecified error"
91-
)
92-
)
93-
);
94-
this->Die(ctx);
95-
return;
96-
}
97-
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION: {
98-
NYql::IKikimrGateway::TGenericResult result;
99-
result.SetSuccess();
100-
Promise.SetValue(std::move(result));
101-
102-
this->Die(ctx);
103-
return;
104-
}
105-
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED: {
106-
Schedule(TDuration::Seconds(10), new TEvAnalyzePrivate::TEvAnalyzeStatusCheck());
107-
return;
108-
}
109-
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS: {
110-
Schedule(TDuration::Seconds(5), new TEvAnalyzePrivate::TEvAnalyzeStatusCheck());
111-
return;
112-
}
113-
}
56+
NYql::IKikimrGateway::TGenericResult result;
57+
result.SetSuccess();
58+
Promise.SetValue(std::move(result));
59+
this->Die(ctx);
11460
}
11561

11662
void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
@@ -188,13 +134,56 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&
188134
}
189135
}
190136

137+
TDuration TAnalyzeActor::CalcBackoffTime() {
138+
ui32 backoffSlots = 1 << RetryCount;
139+
TDuration maxDuration = RetryInterval * backoffSlots;
140+
141+
double uncertaintyRatio = std::max(std::min(UncertainRatio, 1.0), 0.0);
142+
double uncertaintyMultiplier = RandomNumber<double>() * uncertaintyRatio - uncertaintyRatio + 1.0;
143+
144+
double durationMs = round(maxDuration.MilliSeconds() * uncertaintyMultiplier);
145+
durationMs = std::max(std::min(durationMs, MaxBackoffDurationMs), 0.0);
146+
return TDuration::MilliSeconds(durationMs);
147+
}
148+
149+
void TAnalyzeActor::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev, const TActorContext& ctx) {
150+
Y_UNUSED(ev, ctx);
151+
152+
if (RetryCount >= MaxRetryCount) {
153+
Promise.SetValue(
154+
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
155+
YqlIssue(
156+
{}, NYql::TIssuesIds::UNEXPECTED,
157+
TStringBuilder() << "Can't establish connection with the Statistics Aggregator!"
158+
)
159+
)
160+
);
161+
this->Die(ctx);
162+
return;
163+
}
164+
165+
++RetryCount;
166+
Schedule(CalcBackoffTime(), new TEvAnalyzePrivate::TEvAnalyzeRetry());
167+
}
168+
169+
void TAnalyzeActor::Handle(TEvAnalyzePrivate::TEvAnalyzeRetry::TPtr& ev, const TActorContext& ctx) {
170+
Y_UNUSED(ev, ctx);
171+
172+
auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
173+
analyzeRequest->Record = Request.Record;
174+
Send(
175+
MakePipePerNodeCacheID(false),
176+
new TEvPipeCache::TEvForward(analyzeRequest.release(), StatisticsAggregatorId.value(), true),
177+
IEventHandle::FlagTrackDelivery
178+
);
179+
}
180+
191181
void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TActorContext& ctx) {
192182
Y_ABORT_UNLESS(entry.DomainInfo->Params.HasStatisticsAggregator());
193183

194184
StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator();
195185

196-
auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
197-
auto& record = analyzeRequest->Record;
186+
auto& record = Request.Record;
198187
record.SetOperationId(OperationId);
199188
auto table = record.AddTables();
200189

@@ -223,7 +212,8 @@ void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeC
223212
*table->MutableColumnTags()->Add() = tagByColumnName[columnName];
224213
}
225214

226-
// TODO This request should be retried if StatisticsAggregator fails
215+
auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
216+
analyzeRequest->Record = Request.Record;
227217
Send(
228218
MakePipePerNodeCacheID(false),
229219
new TEvPipeCache::TEvForward(analyzeRequest.release(), entry.DomainInfo->Params.GetStatisticsAggregator(), true),

ydb/core/kqp/gateway/actors/analyze_actor.h

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/library/actors/core/actor_bootstrapped.h>
44
#include <ydb/library/actors/core/hfunc.h>
55

6+
#include <ydb/core/base/tablet_pipecache.h>
67
#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
78

89

@@ -11,11 +12,11 @@ namespace NKikimr::NKqp {
1112

1213
struct TEvAnalyzePrivate {
1314
enum EEv {
14-
EvAnalyzeStatusCheck = EventSpaceBegin(TEvents::ES_PRIVATE),
15+
EvAnalyzeRetry = EventSpaceBegin(TEvents::ES_PRIVATE),
1516
EvEnd
1617
};
1718

18-
struct TEvAnalyzeStatusCheck : public TEventLocal<TEvAnalyzeStatusCheck, EvAnalyzeStatusCheck> {};
19+
struct TEvAnalyzeRetry : public TEventLocal<TEvAnalyzeRetry, EvAnalyzeRetry> {};
1920
};
2021

2122
class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
@@ -28,8 +29,8 @@ class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
2829
switch(ev->GetTypeRewrite()) {
2930
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
3031
HFunc(NStat::TEvStatistics::TEvAnalyzeResponse, Handle);
31-
HFunc(NStat::TEvStatistics::TEvAnalyzeStatusResponse, Handle);
32-
HFunc(TEvAnalyzePrivate::TEvAnalyzeStatusCheck, Handle);
32+
HFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
33+
HFunc(TEvAnalyzePrivate::TEvAnalyzeRetry, Handle);
3334
default:
3435
HandleUnexpectedEvent(ev->GetTypeRewrite());
3536
}
@@ -38,17 +39,18 @@ class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
3839
private:
3940
void Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx);
4041

41-
void Handle(NStat::TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev, const TActorContext& ctx);
42-
4342
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
4443

45-
void Handle(TEvAnalyzePrivate::TEvAnalyzeStatusCheck::TPtr& ev, const TActorContext& ctx);
44+
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev, const TActorContext& ctx);
45+
46+
void Handle(TEvAnalyzePrivate::TEvAnalyzeRetry::TPtr& ev, const TActorContext& ctx);
4647

4748
void HandleUnexpectedEvent(ui32 typeRewrite);
4849

50+
private:
4951
void SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry&, const TActorContext&);
5052

51-
void SendAnalyzeStatus();
53+
TDuration CalcBackoffTime();
5254

5355
private:
5456
TString TablePath;
@@ -58,6 +60,15 @@ class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
5860
std::optional<ui64> StatisticsAggregatorId;
5961
TPathId PathId;
6062
TString OperationId;
63+
64+
// for retries
65+
NStat::TEvStatistics::TEvAnalyze Request;
66+
TDuration RetryInterval = TDuration::MilliSeconds(5);
67+
size_t RetryCount = 0;
68+
69+
constexpr static size_t MaxRetryCount = 10;
70+
constexpr static double UncertainRatio = 0.5;
71+
constexpr static double MaxBackoffDurationMs = TDuration::Seconds(15).MilliSeconds();
6172
};
6273

6374
} // end of NKikimr::NKqp

0 commit comments

Comments
 (0)