1
1
#include " actors.h"
2
2
3
3
#include < ydb/core/kqp/common/simple/services.h>
4
+ #include < ydb/core/kqp/rm_service/kqp_rm_service.h>
4
5
5
6
6
7
namespace NKqpRun {
@@ -11,13 +12,14 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
11
12
public:
12
13
TRunScriptActorMock (THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
13
14
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
14
- ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan)
15
+ ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
16
+ TProgressCallback progressCallback)
15
17
: Request_(std::move(request))
16
18
, Promise_(promise)
17
19
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
18
20
, ResultSizeLimit_(std::numeric_limits<i64>::max())
19
21
, ResultSets_(resultSets)
20
- , QueryPlan_(queryPlan )
22
+ , ProgressCallback_(progressCallback )
21
23
{
22
24
if (resultRowsLimit) {
23
25
ResultRowsLimit_ = resultRowsLimit;
@@ -76,7 +78,9 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
76
78
}
77
79
78
80
void Handle (NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
79
- QueryPlan_ = ev->Get ()->Record .GetQueryPlan ();
81
+ if (ProgressCallback_) {
82
+ ProgressCallback_ (ev->Get ()->Record );
83
+ }
80
84
}
81
85
82
86
private:
@@ -85,15 +89,104 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
85
89
ui64 ResultRowsLimit_;
86
90
ui64 ResultSizeLimit_;
87
91
std::vector<Ydb::ResultSet>& ResultSets_;
88
- TString& QueryPlan_;
92
+ TProgressCallback ProgressCallback_;
93
+ };
94
+
95
+ class TResourcesWaiterActor : public NActors ::TActorBootstrapped<TResourcesWaiterActor> {
96
+ struct TEvPrivate {
97
+ enum EEv : ui32 {
98
+ EvResourcesInfo = EventSpaceBegin (NActors::TEvents::ES_PRIVATE),
99
+
100
+ EvEnd
101
+ };
102
+
103
+ static_assert (EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), " expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)" );
104
+
105
+ struct TEvResourcesInfo : public NActors ::TEventLocal<TEvResourcesInfo, EvResourcesInfo> {
106
+ explicit TEvResourcesInfo (i32 nodeCount)
107
+ : NodeCount(nodeCount)
108
+ {}
109
+
110
+ const i32 NodeCount;
111
+ };
112
+ };
113
+
114
+ static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10 );
115
+
116
+ public:
117
+ TResourcesWaiterActor (NThreading::TPromise<void > promise, i32 expectedNodeCount)
118
+ : ExpectedNodeCount_(expectedNodeCount)
119
+ , Promise_(promise)
120
+ {}
121
+
122
+ void Bootstrap () {
123
+ Become (&TResourcesWaiterActor::StateFunc);
124
+ CheckResourcesPublish ();
125
+ }
126
+
127
+ void Handle (NActors::TEvents::TEvWakeup::TPtr&) {
128
+ CheckResourcesPublish ();
129
+ }
130
+
131
+ void Handle (TEvPrivate::TEvResourcesInfo::TPtr& ev) {
132
+ if (ev->Get ()->NodeCount == ExpectedNodeCount_) {
133
+ Promise_.SetValue ();
134
+ PassAway ();
135
+ return ;
136
+ }
137
+
138
+ Schedule (REFRESH_PERIOD, new NActors::TEvents::TEvWakeup ());
139
+ }
140
+
141
+ STRICT_STFUNC (StateFunc,
142
+ hFunc (NActors::TEvents::TEvWakeup, Handle );
143
+ hFunc (TEvPrivate::TEvResourcesInfo, Handle );
144
+ )
145
+
146
+ private:
147
+ void CheckResourcesPublish() {
148
+ GetResourceManager ();
149
+
150
+ if (!ResourceManager_) {
151
+ Schedule (REFRESH_PERIOD, new NActors::TEvents::TEvWakeup ());
152
+ return ;
153
+ }
154
+
155
+ UpdateResourcesInfo ();
156
+ }
157
+
158
+ void GetResourceManager () {
159
+ if (ResourceManager_) {
160
+ return ;
161
+ }
162
+ ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager (SelfId ().NodeId ());
163
+ }
164
+
165
+ void UpdateResourcesInfo () const {
166
+ ResourceManager_->RequestClusterResourcesInfo (
167
+ [selfId = SelfId (), actorContext = ActorContext ()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
168
+ actorContext.Send (selfId, new TEvPrivate::TEvResourcesInfo (resources.size ()));
169
+ });
170
+ }
171
+
172
+ private:
173
+ const i32 ExpectedNodeCount_;
174
+ NThreading::TPromise<void > Promise_;
175
+
176
+ std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
89
177
};
90
178
91
179
} // anonymous namespace
92
180
93
181
NActors::IActor* CreateRunScriptActorMock (THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
94
182
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
95
- ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan) {
96
- return new TRunScriptActorMock (std::move (request), promise, resultRowsLimit, resultSizeLimit, resultSets, queryPlan);
183
+ ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
184
+ TProgressCallback progressCallback) {
185
+ return new TRunScriptActorMock (std::move (request), promise, resultRowsLimit, resultSizeLimit, resultSets, progressCallback);
186
+ }
187
+
188
+ NActors::IActor* CreateResourcesWaiterActor (NThreading::TPromise<void > promise, i32 expectedNodeCount) {
189
+ return new TResourcesWaiterActor (promise, expectedNodeCount);
97
190
}
98
191
99
192
} // namespace NKqpRun
0 commit comments