23
23
#include < yql/essentials/utils/yql_panic.h>
24
24
#include < ydb/core/formats/arrow/serializer/abstract.h>
25
25
26
+ #include < library/cpp/retry/retry_policy.h>
27
+
26
28
namespace NYql ::NDq {
27
29
28
30
using namespace NActors ;
@@ -61,6 +63,12 @@ namespace NYql::NDq {
61
63
public TGenericBaseActor<TGenericLookupActor> {
62
64
using TBase = TGenericBaseActor<TGenericLookupActor>;
63
65
66
+ using ILookupRetryPolicy = IRetryPolicy<const NYdbGrpc::TGrpcStatus&>;
67
+ using ILookupRetryState = ILookupRetryPolicy::IRetryState;
68
+
69
+ struct TEvLookupRetry : NActors::TEventLocal<TEvLookupRetry, EvRetry> {
70
+ };
71
+
64
72
public:
65
73
TGenericLookupActor (
66
74
NConnector::IClient::TPtr connectorClient,
@@ -87,6 +95,24 @@ namespace NYql::NDq {
87
95
, HolderFactory(holderFactory)
88
96
, ColumnDestinations(CreateColumnDestination())
89
97
, MaxKeysInRequest(maxKeysInRequest)
98
+ , RetryPolicy(
99
+ ILookupRetryPolicy::GetExponentialBackoffPolicy (
100
+ /* retryClassFunction */
101
+ [](const NYdbGrpc::TGrpcStatus& status) {
102
+ if (NConnector::GrpcStatusNeedsRetry (status)) {
103
+ return ERetryErrorClass::ShortRetry;
104
+ }
105
+ if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
106
+ return ERetryErrorClass::ShortRetry; // TODO LongRetry?
107
+ }
108
+ return ERetryErrorClass::NoRetry;
109
+ },
110
+ /* minDelay */ TDuration::MilliSeconds(1 ),
111
+ /* minLongRetryDelay */ TDuration::MilliSeconds(500 ),
112
+ /* maxDelay */ TDuration::Seconds(1 ),
113
+ /* maxRetries */ RequestRetriesLimit,
114
+ /* maxTime */ TDuration::Minutes(5 ),
115
+ /* scaleFactor */ 2 ))
90
116
{
91
117
InitMonCounters (taskCounters);
92
118
}
@@ -157,7 +183,7 @@ namespace NYql::NDq {
157
183
hFunc (TEvReadSplitsPart, Handle );
158
184
hFunc (TEvReadSplitsFinished, Handle );
159
185
hFunc (TEvError, Handle );
160
- hFunc (TEvRetry , Handle );
186
+ hFunc (TEvLookupRetry , Handle );
161
187
hFunc (NActors::TEvents::TEvPoison, Handle );)
162
188
163
189
void Handle (TEvListSplitsIterator::TPtr ev) {
@@ -166,7 +192,7 @@ namespace NYql::NDq {
166
192
[
167
193
actorSystem = TActivationContext::ActorSystem (),
168
194
selfId = SelfId (),
169
- retriesRemaining = RetriesRemaining
195
+ retryState = RetryState
170
196
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
171
197
YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got TListSplitsResponse from Connector" ;
172
198
auto result = ExtractFromConstFuture (asyncResult);
@@ -175,7 +201,7 @@ namespace NYql::NDq {
175
201
auto ev = new TEvListSplitsPart (std::move (*result.Response ));
176
202
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
177
203
} else {
178
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
204
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
179
205
}
180
206
});
181
207
}
@@ -199,15 +225,15 @@ namespace NYql::NDq {
199
225
Connector->ReadSplits (readRequest, RequestTimeout).Subscribe ([
200
226
actorSystem = TActivationContext::ActorSystem (),
201
227
selfId = SelfId (),
202
- retriesRemaining = RetriesRemaining
228
+ retryState = RetryState
203
229
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
204
230
YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector" ;
205
231
auto result = ExtractFromConstFuture (asyncResult);
206
232
if (result.Status .Ok ()) {
207
233
auto ev = new TEvReadSplitsIterator (std::move (result.Iterator ));
208
234
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
209
235
} else {
210
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
236
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
211
237
}
212
238
});
213
239
}
@@ -236,9 +262,8 @@ namespace NYql::NDq {
236
262
actorSystem->Send (new NActors::IEventHandle (ParentId, SelfId (), errEv.release ()));
237
263
}
238
264
239
- void Handle (TEvRetry ::TPtr ev ) {
265
+ void Handle (TEvLookupRetry ::TPtr) {
240
266
auto guard = Guard (*Alloc);
241
- RetriesRemaining = ev->Get ()->NextRetries ;
242
267
SendRequest ();
243
268
}
244
269
@@ -270,7 +295,7 @@ namespace NYql::NDq {
270
295
}
271
296
272
297
Request = std::move (request);
273
- RetriesRemaining = RequestRetriesLimit ;
298
+ RetryState = std::shared_ptr<ILookupRetryState>(RetryPolicy-> CreateRetryState ()) ;
274
299
SendRequest ();
275
300
}
276
301
@@ -288,7 +313,7 @@ namespace NYql::NDq {
288
313
Connector->ListSplits (splitRequest, RequestTimeout).Subscribe ([
289
314
actorSystem = TActivationContext::ActorSystem (),
290
315
selfId = SelfId (),
291
- retriesRemaining = RetriesRemaining
316
+ retryState = RetryState
292
317
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
293
318
auto result = ExtractFromConstFuture (asyncResult);
294
319
if (result.Status .Ok ()) {
@@ -297,7 +322,7 @@ namespace NYql::NDq {
297
322
auto ev = new TEvListSplitsIterator (std::move (result.Iterator ));
298
323
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
299
324
} else {
300
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
325
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
301
326
}
302
327
});
303
328
if (CpuTime) {
@@ -310,7 +335,7 @@ namespace NYql::NDq {
310
335
[
311
336
actorSystem = TActivationContext::ActorSystem (),
312
337
selfId = SelfId (),
313
- retriesRemaining = RetriesRemaining
338
+ retryState = RetryState
314
339
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
315
340
auto result = ExtractFromConstFuture (asyncResult);
316
341
if (result.Status .Ok ()) {
@@ -329,7 +354,7 @@ namespace NYql::NDq {
329
354
auto ev = new TEvReadSplitsFinished (std::move (result.Status ));
330
355
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
331
356
} else {
332
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
357
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
333
358
}
334
359
});
335
360
}
@@ -395,22 +420,12 @@ namespace NYql::NDq {
395
420
new TEvError (std::move (error)));
396
421
}
397
422
398
- static void SendRetryOrError (NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
399
- if (NConnector::GrpcStatusNeedsRetry (status) || status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
400
- if (retriesRemaining) {
401
- const auto retry = RequestRetriesLimit - retriesRemaining;
402
- const auto delay = TDuration::MilliSeconds (1u << retry); // Exponential delay from 1ms to ~0.5s
403
- // << TODO tune/tweak
404
- YQL_CLOG (WARN, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry " << (retry + 1 ) << " of " << RequestRetriesLimit << " , scheduled in " << delay;
405
- --retriesRemaining;
406
- if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
407
- // if error was deadline, retry only once
408
- retriesRemaining = 0 ; // TODO tune/tweak
409
- }
410
- actorSystem->Schedule (delay, new IEventHandle (selfId, selfId, new TEvRetry (retriesRemaining)));
411
- return ;
412
- }
413
- YQL_CLOG (ERROR, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry count exceed limit " << RequestRetriesLimit;
423
+ static void SendRetryOrError (NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, std::shared_ptr<ILookupRetryState> retryState) {
424
+ auto nextRetry = retryState->GetNextRetryDelay (status);
425
+ if (nextRetry) {
426
+ YQL_CLOG (WARN, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry scheduled in " << *nextRetry;
427
+ actorSystem->Schedule (*nextRetry, new IEventHandle (selfId, selfId, new TEvLookupRetry ()));
428
+ return ;
414
429
}
415
430
SendError (actorSystem, selfId, NConnector::ErrorFromGRPCStatus (status));
416
431
}
@@ -510,7 +525,8 @@ namespace NYql::NDq {
510
525
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
511
526
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
512
527
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
513
- ui32 RetriesRemaining;
528
+ ILookupRetryPolicy::TPtr RetryPolicy;
529
+ std::shared_ptr<ILookupRetryState> RetryState;
514
530
::NMonitoring::TDynamicCounters::TCounterPtr Count;
515
531
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
516
532
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;
0 commit comments