22
22
#include < yql/essentials/utils/yql_panic.h>
23
23
#include < ydb/core/formats/arrow/serializer/abstract.h>
24
24
25
+ #include < library/cpp/retry/retry_policy.h>
26
+
25
27
namespace NYql ::NDq {
26
28
27
29
using namespace NActors ;
@@ -60,6 +62,12 @@ namespace NYql::NDq {
60
62
public TGenericBaseActor<TGenericLookupActor> {
61
63
using TBase = TGenericBaseActor<TGenericLookupActor>;
62
64
65
+ using ILookupRetryPolicy = IRetryPolicy<const NYdbGrpc::TGrpcStatus&>;
66
+ using ILookupRetryState = ILookupRetryPolicy::IRetryState;
67
+
68
+ struct TEvLookupRetry : NActors::TEventLocal<TEvLookupRetry, EvRetry> {
69
+ };
70
+
63
71
public:
64
72
TGenericLookupActor (
65
73
NConnector::IClient::TPtr connectorClient,
@@ -86,6 +94,24 @@ namespace NYql::NDq {
86
94
, HolderFactory(holderFactory)
87
95
, ColumnDestinations(CreateColumnDestination())
88
96
, MaxKeysInRequest(maxKeysInRequest)
97
+ , RetryPolicy(
98
+ ILookupRetryPolicy::GetExponentialBackoffPolicy (
99
+ /* retryClassFunction */
100
+ [](const NYdbGrpc::TGrpcStatus& status) {
101
+ if (NConnector::GrpcStatusNeedsRetry (status)) {
102
+ return ERetryErrorClass::ShortRetry;
103
+ }
104
+ if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
105
+ return ERetryErrorClass::ShortRetry; // TODO LongRetry?
106
+ }
107
+ return ERetryErrorClass::NoRetry;
108
+ },
109
+ /* minDelay */ TDuration::MilliSeconds(1 ),
110
+ /* minLongRetryDelay */ TDuration::MilliSeconds(500 ),
111
+ /* maxDelay */ TDuration::Seconds(1 ),
112
+ /* maxRetries */ RequestRetriesLimit,
113
+ /* maxTime */ TDuration::Minutes(5 ),
114
+ /* scaleFactor */ 2 ))
89
115
{
90
116
InitMonCounters (taskCounters);
91
117
}
@@ -156,7 +182,7 @@ namespace NYql::NDq {
156
182
hFunc (TEvReadSplitsPart, Handle);
157
183
hFunc (TEvReadSplitsFinished, Handle);
158
184
hFunc (TEvError, Handle);
159
- hFunc (TEvRetry , Handle);
185
+ hFunc (TEvLookupRetry , Handle);
160
186
hFunc (NActors::TEvents::TEvPoison, Handle);)
161
187
162
188
void Handle (TEvListSplitsIterator::TPtr ev) {
@@ -165,7 +191,7 @@ namespace NYql::NDq {
165
191
[
166
192
actorSystem = TActivationContext::ActorSystem (),
167
193
selfId = SelfId (),
168
- retriesRemaining = RetriesRemaining
194
+ retryState = RetryState
169
195
](const NConnector::TAsyncResult<NConnector::NApi::TListSplitsResponse>& asyncResult) {
170
196
YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got TListSplitsResponse from Connector" ;
171
197
auto result = ExtractFromConstFuture (asyncResult);
@@ -174,7 +200,7 @@ namespace NYql::NDq {
174
200
auto ev = new TEvListSplitsPart (std::move (*result.Response ));
175
201
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
176
202
} else {
177
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
203
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
178
204
}
179
205
});
180
206
}
@@ -198,15 +224,15 @@ namespace NYql::NDq {
198
224
Connector->ReadSplits (readRequest, RequestTimeout).Subscribe ([
199
225
actorSystem = TActivationContext::ActorSystem (),
200
226
selfId = SelfId (),
201
- retriesRemaining = RetriesRemaining
227
+ retryState = RetryState
202
228
](const NConnector::TReadSplitsStreamIteratorAsyncResult& asyncResult) {
203
229
YQL_CLOG (DEBUG, ProviderGeneric) << " ActorId=" << selfId << " Got ReadSplitsStreamIterator from Connector" ;
204
230
auto result = ExtractFromConstFuture (asyncResult);
205
231
if (result.Status .Ok ()) {
206
232
auto ev = new TEvReadSplitsIterator (std::move (result.Iterator ));
207
233
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
208
234
} else {
209
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
235
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
210
236
}
211
237
});
212
238
}
@@ -235,9 +261,8 @@ namespace NYql::NDq {
235
261
actorSystem->Send (new NActors::IEventHandle (ParentId, SelfId (), errEv.release ()));
236
262
}
237
263
238
- void Handle (TEvRetry ::TPtr ev ) {
264
+ void Handle (TEvLookupRetry ::TPtr) {
239
265
auto guard = Guard (*Alloc);
240
- RetriesRemaining = ev->Get ()->NextRetries ;
241
266
SendRequest ();
242
267
}
243
268
@@ -269,7 +294,7 @@ namespace NYql::NDq {
269
294
}
270
295
271
296
Request = std::move (request);
272
- RetriesRemaining = RequestRetriesLimit ;
297
+ RetryState = std::shared_ptr<ILookupRetryState>(RetryPolicy-> CreateRetryState ()) ;
273
298
SendRequest ();
274
299
}
275
300
@@ -287,7 +312,7 @@ namespace NYql::NDq {
287
312
Connector->ListSplits (splitRequest, RequestTimeout).Subscribe ([
288
313
actorSystem = TActivationContext::ActorSystem (),
289
314
selfId = SelfId (),
290
- retriesRemaining = RetriesRemaining
315
+ retryState = RetryState
291
316
](const NConnector::TListSplitsStreamIteratorAsyncResult& asyncResult) {
292
317
auto result = ExtractFromConstFuture (asyncResult);
293
318
if (result.Status .Ok ()) {
@@ -296,7 +321,7 @@ namespace NYql::NDq {
296
321
auto ev = new TEvListSplitsIterator (std::move (result.Iterator ));
297
322
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
298
323
} else {
299
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
324
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
300
325
}
301
326
});
302
327
if (CpuTime) {
@@ -309,7 +334,7 @@ namespace NYql::NDq {
309
334
[
310
335
actorSystem = TActivationContext::ActorSystem (),
311
336
selfId = SelfId (),
312
- retriesRemaining = RetriesRemaining
337
+ retryState = RetryState
313
338
](const NConnector::TAsyncResult<NConnector::NApi::TReadSplitsResponse>& asyncResult) {
314
339
auto result = ExtractFromConstFuture (asyncResult);
315
340
if (result.Status .Ok ()) {
@@ -328,7 +353,7 @@ namespace NYql::NDq {
328
353
auto ev = new TEvReadSplitsFinished (std::move (result.Status ));
329
354
actorSystem->Send (new NActors::IEventHandle (selfId, selfId, ev));
330
355
} else {
331
- SendRetryOrError (actorSystem, selfId, result.Status , retriesRemaining );
356
+ SendRetryOrError (actorSystem, selfId, result.Status , retryState );
332
357
}
333
358
});
334
359
}
@@ -394,22 +419,12 @@ namespace NYql::NDq {
394
419
new TEvError (std::move (error)));
395
420
}
396
421
397
- static void SendRetryOrError (NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, ui32 retriesRemaining) {
398
- if (NConnector::GrpcStatusNeedsRetry (status) || status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
399
- if (retriesRemaining) {
400
- const auto retry = RequestRetriesLimit - retriesRemaining;
401
- const auto delay = TDuration::MilliSeconds (1u << retry); // Exponential delay from 1ms to ~0.5s
402
- // << TODO tune/tweak
403
- YQL_CLOG (WARN, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry " << (retry + 1 ) << " of " << RequestRetriesLimit << " , scheduled in " << delay;
404
- --retriesRemaining;
405
- if (status.GRpcStatusCode == grpc::DEADLINE_EXCEEDED) {
406
- // if error was deadline, retry only once
407
- retriesRemaining = 0 ; // TODO tune/tweak
408
- }
409
- actorSystem->Schedule (delay, new IEventHandle (selfId, selfId, new TEvRetry (retriesRemaining)));
410
- return ;
411
- }
412
- YQL_CLOG (ERROR, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry count exceed limit " << RequestRetriesLimit;
422
+ static void SendRetryOrError (NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NYdbGrpc::TGrpcStatus& status, std::shared_ptr<ILookupRetryState> retryState) {
423
+ auto nextRetry = retryState->GetNextRetryDelay (status);
424
+ if (nextRetry) {
425
+ YQL_CLOG (WARN, ProviderGeneric) << " ActorId=" << selfId << " Got retrievable GRPC Error from Connector: " << status.ToDebugString () << " , retry scheduled in " << *nextRetry;
426
+ actorSystem->Schedule (*nextRetry, new IEventHandle (selfId, selfId, new TEvLookupRetry ()));
427
+ return ;
413
428
}
414
429
SendError (actorSystem, selfId, NConnector::ErrorFromGRPCStatus (status));
415
430
}
@@ -509,7 +524,8 @@ namespace NYql::NDq {
509
524
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> Request;
510
525
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart
511
526
NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult;
512
- ui32 RetriesRemaining;
527
+ ILookupRetryPolicy::TPtr RetryPolicy;
528
+ std::shared_ptr<ILookupRetryState> RetryState;
513
529
::NMonitoring::TDynamicCounters::TCounterPtr Count;
514
530
::NMonitoring::TDynamicCounters::TCounterPtr Keys;
515
531
::NMonitoring::TDynamicCounters::TCounterPtr ResultRows;
0 commit comments