15
15
#include < util/stream/file.h>
16
16
#include < util/generic/guid.h>
17
17
#include < util/string/split.h>
18
+ #include < util/system/yassert.h>
19
+ #include < util/random/random.h>
18
20
19
21
using namespace NYdbGrpc ;
20
22
using namespace Yql ::DqsProto;
@@ -41,22 +43,39 @@ int SvnRevision(TServiceConnection<DqService>& service, const TVector<TString>&
41
43
return promise.GetFuture ().GetValueSync ();
42
44
}
43
45
44
- ClusterStatusResponse Info (TServiceConnection<DqService>& service) {
45
- auto promise = NThreading::NewPromise<ClusterStatusResponse>();
46
- auto callback = [&](TGrpcStatus&& status, ClusterStatusResponse&& resp) {
47
- if (status.Ok ()) {
48
- promise.SetValue (resp);
49
- } else {
50
- Cerr << " Error " << status.GRpcStatusCode << " message: " << status.Msg << Endl;
51
- promise.SetException (" Error" );
46
+ ClusterStatusResponse InfoWithReties (TServiceConnection<DqService>& service, ui32 retries) {
47
+ Y_ENSURE (retries > 0 );
48
+ NThreading::TFuture<ClusterStatusResponse> future;
49
+ for (ui32 i = 0 ; i < retries; ++i) {
50
+ auto promise = NThreading::NewPromise<ClusterStatusResponse>();
51
+ future = promise.GetFuture ();
52
+ auto callback = [&](TGrpcStatus&& status, ClusterStatusResponse&& resp) {
53
+ if (status.Ok ()) {
54
+ promise.SetValue (resp);
55
+ } else {
56
+ Cerr << " Error getting DQ info: code=" << status.GRpcStatusCode << " , message: " << status.Msg << " , details: " << status.Details << Endl;
57
+ if (retries > 0 ) {
58
+ promise.SetValue (InfoWithReties (service, retries - 1 ));
59
+ }
60
+ promise.SetException (" Error getting DQ info" );
61
+ }
62
+ };
63
+
64
+ service.DoRequest <ClusterStatusRequest, ClusterStatusResponse>(
65
+ ClusterStatusRequest (),
66
+ callback,
67
+ &DqService::Stub::AsyncClusterStatus);
68
+ future.Wait ();
69
+ if (!future.HasException ()) {
70
+ break ;
52
71
}
53
- };
72
+ Sleep (TDuration::MilliSeconds (2000ul + RandomNumber (1000ul )));
73
+ }
74
+ return future.GetValue ();
75
+ }
54
76
55
- service.DoRequest <ClusterStatusRequest, ClusterStatusResponse>(
56
- ClusterStatusRequest (),
57
- callback,
58
- &DqService::Stub::AsyncClusterStatus);
59
- return promise.GetFuture ().GetValueSync ();
77
+ ClusterStatusResponse Info (TServiceConnection<DqService>& service) {
78
+ return InfoWithReties (service, 5 );
60
79
}
61
80
62
81
void Stop (TServiceConnection<DqService>& service, const JobStopRequest& request)
0 commit comments