Skip to content

Commit 7b1e87a

Browse files
committed
Moved logic to query_actor.h
1 parent 24d1dfe commit 7b1e87a

File tree

2 files changed

+99
-91
lines changed

2 files changed

+99
-91
lines changed

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

-90
Original file line numberDiff line numberDiff line change
@@ -77,96 +77,6 @@ class TQueryBase : public NKikimr::TQueryBase {
7777
{}
7878
};
7979

80-
template<typename TQueryActor, typename TResponse, typename ...TArgs>
81-
class TQueryRetryActor : public TActorBootstrapped<TQueryRetryActor<TQueryActor, TResponse, TArgs...>> {
82-
public:
83-
using TBase = TActorBootstrapped<TQueryRetryActor<TQueryActor, TResponse, TArgs...>>;
84-
using IRetryPolicy = IRetryPolicy<Ydb::StatusIds::StatusCode>;
85-
86-
explicit TQueryRetryActor(const TActorId& replyActorId, const TArgs&... args, TDuration maxRetryTime = TDuration::Seconds(30))
87-
: ReplyActorId(replyActorId)
88-
, CreateQueryActor([=]() {
89-
return new TQueryActor(args...);
90-
})
91-
, MaxRetryTime(maxRetryTime)
92-
{}
93-
94-
void StartQueryActor() const {
95-
TBase::Register(CreateQueryActor());
96-
}
97-
98-
void Bootstrap() {
99-
TBase::Become(&TQueryRetryActor::StateFunc);
100-
StartQueryActor();
101-
}
102-
103-
STRICT_STFUNC(StateFunc,
104-
hFunc(TEvents::TEvWakeup, Wakeup);
105-
hFunc(TResponse, Handle);
106-
)
107-
108-
void Wakeup(TEvents::TEvWakeup::TPtr&) {
109-
StartQueryActor();
110-
}
111-
112-
void Handle(const typename TResponse::TPtr& ev) {
113-
const Ydb::StatusIds::StatusCode status = ev->Get()->Status;
114-
if (Retryable(status) == ERetryErrorClass::NoRetry) {
115-
Reply(ev);
116-
return;
117-
}
118-
119-
if (RetryState == nullptr) {
120-
CreateRetryState();
121-
}
122-
123-
if (auto delay = RetryState->GetNextRetryDelay(status)) {
124-
TBase::Schedule(*delay, new TEvents::TEvWakeup());
125-
} else {
126-
Reply(ev);
127-
}
128-
}
129-
130-
void Reply(const typename TResponse::TPtr& ev) {
131-
TBase::Send(ev->Forward(ReplyActorId));
132-
TBase::PassAway();
133-
}
134-
135-
static ERetryErrorClass Retryable(Ydb::StatusIds::StatusCode status) {
136-
if (status == Ydb::StatusIds::SUCCESS) {
137-
return ERetryErrorClass::NoRetry;
138-
}
139-
140-
if (status == Ydb::StatusIds::INTERNAL_ERROR
141-
|| status == Ydb::StatusIds::UNAVAILABLE
142-
|| status == Ydb::StatusIds::BAD_SESSION
143-
|| status == Ydb::StatusIds::SESSION_EXPIRED
144-
|| status == Ydb::StatusIds::SESSION_BUSY
145-
|| status == Ydb::StatusIds::TIMEOUT
146-
|| status == Ydb::StatusIds::ABORTED) {
147-
return ERetryErrorClass::ShortRetry;
148-
}
149-
150-
if (status == Ydb::StatusIds::OVERLOADED
151-
|| status == Ydb::StatusIds::UNDETERMINED) {
152-
return ERetryErrorClass::LongRetry;
153-
}
154-
155-
return ERetryErrorClass::NoRetry;
156-
}
157-
158-
void CreateRetryState() {
159-
IRetryPolicy::TPtr policy = IRetryPolicy::GetExponentialBackoffPolicy(Retryable, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), MaxRetryTime);
160-
RetryState = policy->CreateRetryState();
161-
}
162-
163-
private:
164-
const TActorId ReplyActorId;
165-
const std::function<TQueryActor*()> CreateQueryActor;
166-
const TDuration MaxRetryTime;
167-
IRetryPolicy::IRetryState::TPtr RetryState = nullptr;
168-
};
169-
17080

17181
class TScriptExecutionsTablesCreator : public TActorBootstrapped<TScriptExecutionsTablesCreator> {
17282
public:

ydb/library/query_actor/query_actor.h

+99-1
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212
#include <ydb/library/actors/core/actorsystem.h>
1313
#include <ydb/library/actors/core/event_local.h>
1414
#include <ydb/library/actors/core/events.h>
15+
#include <ydb/library/actors/core/hfunc.h>
16+
#include <library/cpp/retry/retry_policy.h>
1517
#include <library/cpp/threading/future/future.h>
1618

1719
namespace NKikimr {
1820

19-
// TODO: add retry logic
2021
class TQueryBase : public NActors::TActorBootstrapped<TQueryBase> {
2122
protected:
2223
struct TTxControl {
@@ -168,4 +169,101 @@ class TQueryBase : public NActors::TActorBootstrapped<TQueryBase> {
168169
std::vector<NYdb::TResultSet> ResultSets;
169170
};
170171

172+
template<typename TQueryActor, typename TResponse, typename ...TArgs>
173+
class TQueryRetryActor : public NActors::TActorBootstrapped<TQueryRetryActor<TQueryActor, TResponse, TArgs...>> {
174+
public:
175+
using TBase = NActors::TActorBootstrapped<TQueryRetryActor<TQueryActor, TResponse, TArgs...>>;
176+
using IRetryPolicy = IRetryPolicy<Ydb::StatusIds::StatusCode>;
177+
178+
explicit TQueryRetryActor(const NActors::TActorId& replyActorId, const TArgs&... args, TDuration maxRetryTime = TDuration::Seconds(1))
179+
: ReplyActorId(replyActorId)
180+
, RetryPolicy(IRetryPolicy::GetExponentialBackoffPolicy(
181+
Retryable, TDuration::MilliSeconds(10),
182+
TDuration::MilliSeconds(200), TDuration::Seconds(1),
183+
std::numeric_limits<size_t>::max(), maxRetryTime
184+
))
185+
, CreateQueryActor([=]() {
186+
return new TQueryActor(args...);
187+
})
188+
{}
189+
190+
TQueryRetryActor(const NActors::TActorId& replyActorId, IRetryPolicy::TPtr retryPolicy, const TArgs&... args)
191+
: ReplyActorId(replyActorId)
192+
, RetryPolicy(retryPolicy)
193+
, CreateQueryActor([=]() {
194+
return new TQueryActor(args...);
195+
})
196+
{}
197+
198+
void StartQueryActor() const {
199+
TBase::Register(CreateQueryActor());
200+
}
201+
202+
void Bootstrap() {
203+
TBase::Become(&TQueryRetryActor::StateFunc);
204+
StartQueryActor();
205+
}
206+
207+
STRICT_STFUNC(StateFunc,
208+
hFunc(NActors::TEvents::TEvWakeup, Wakeup);
209+
hFunc(TResponse, Handle);
210+
)
211+
212+
void Wakeup(NActors::TEvents::TEvWakeup::TPtr&) {
213+
StartQueryActor();
214+
}
215+
216+
void Handle(const typename TResponse::TPtr& ev) {
217+
const Ydb::StatusIds::StatusCode status = ev->Get()->Status;
218+
if (Retryable(status) == ERetryErrorClass::NoRetry) {
219+
Reply(ev);
220+
return;
221+
}
222+
223+
if (RetryState == nullptr) {
224+
RetryState = RetryPolicy->CreateRetryState();
225+
}
226+
227+
if (auto delay = RetryState->GetNextRetryDelay(status)) {
228+
TBase::Schedule(*delay, new NActors::TEvents::TEvWakeup());
229+
} else {
230+
Reply(ev);
231+
}
232+
}
233+
234+
void Reply(const typename TResponse::TPtr& ev) {
235+
TBase::Send(ev->Forward(ReplyActorId));
236+
TBase::PassAway();
237+
}
238+
239+
static ERetryErrorClass Retryable(Ydb::StatusIds::StatusCode status) {
240+
if (status == Ydb::StatusIds::SUCCESS) {
241+
return ERetryErrorClass::NoRetry;
242+
}
243+
244+
if (status == Ydb::StatusIds::INTERNAL_ERROR
245+
|| status == Ydb::StatusIds::UNAVAILABLE
246+
|| status == Ydb::StatusIds::BAD_SESSION
247+
|| status == Ydb::StatusIds::SESSION_EXPIRED
248+
|| status == Ydb::StatusIds::SESSION_BUSY
249+
|| status == Ydb::StatusIds::TIMEOUT
250+
|| status == Ydb::StatusIds::ABORTED) {
251+
return ERetryErrorClass::ShortRetry;
252+
}
253+
254+
if (status == Ydb::StatusIds::OVERLOADED
255+
|| status == Ydb::StatusIds::UNDETERMINED) {
256+
return ERetryErrorClass::LongRetry;
257+
}
258+
259+
return ERetryErrorClass::NoRetry;
260+
}
261+
262+
private:
263+
const NActors::TActorId ReplyActorId;
264+
const IRetryPolicy::TPtr RetryPolicy;
265+
const std::function<TQueryActor*()> CreateQueryActor;
266+
IRetryPolicy::IRetryState::TPtr RetryState = nullptr;
267+
};
268+
171269
} // namespace NKikimr

0 commit comments

Comments
 (0)