@@ -41,13 +41,26 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
41
41
public NActors::TActorBootstrapped<TDqChannelStorageActor>
42
42
{
43
43
using TBase = TActorBootstrapped<TDqChannelStorageActor>;
44
+
45
+ struct TWritingBlobInfo {
46
+ ui64 Size ;
47
+ NThreading::TPromise<void > SavePromise;
48
+ TInstant OpBegin;
49
+ };
50
+
51
+ struct TLoadingBlobInfo {
52
+ NThreading::TPromise<TBuffer> BlobPromise;
53
+ TInstant OpBegin;
54
+ };
44
55
public:
45
56
46
- TDqChannelStorageActor (TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem)
57
+ TDqChannelStorageActor (TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback,
58
+ TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters, TActorSystem* actorSystem)
47
59
: TxId_(txId)
48
60
, ChannelId_(channelId)
49
61
, WakeUpCallback_(std::move(wakeUpCallback))
50
62
, ErrorCallback_(std::move(errorCallback))
63
+ , SpillingTaskCounters_(spillingTaskCounters)
51
64
, ActorSystem_(actorSystem)
52
65
{}
53
66
@@ -101,8 +114,11 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
101
114
void HandleWork (TEvDqChannelSpilling::TEvGet::TPtr& ev) {
102
115
auto & msg = *ev->Get ();
103
116
LOG_T (" [TEvGet] blobId: " << msg.BlobId_ );
117
+
118
+ auto opBegin = TInstant::Now ();
104
119
105
- LoadingBlobs_.emplace (msg.BlobId_ , std::move (msg.Promise_ ));
120
+ auto loadingBlobInfo = TLoadingBlobInfo{std::move (msg.Promise_ ), opBegin};
121
+ LoadingBlobs_.emplace (msg.BlobId_ , std::move (loadingBlobInfo));
106
122
107
123
SendInternal (SpillingActorId_, new TEvDqSpilling::TEvRead (msg.BlobId_ ));
108
124
}
@@ -111,7 +127,10 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
111
127
auto & msg = *ev->Get ();
112
128
LOG_T (" [TEvPut] blobId: " << msg.BlobId_ );
113
129
114
- WritingBlobs_.emplace (msg.BlobId_ , std::move (msg.Promise_ ));
130
+ auto opBegin = TInstant::Now ();
131
+
132
+ auto writingBlobInfo = TWritingBlobInfo{msg.Blob_ .size (), std::move (msg.Promise_ ), opBegin};
133
+ WritingBlobs_.emplace (msg.BlobId_ , std::move (writingBlobInfo));
115
134
116
135
SendInternal (SpillingActorId_, new TEvDqSpilling::TEvWrite (msg.BlobId_ , std::move (msg.Blob_ )));
117
136
}
@@ -126,8 +145,15 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
126
145
return ;
127
146
}
128
147
148
+ auto & blobInfo = it->second ;
149
+
150
+ if (SpillingTaskCounters_) {
151
+ SpillingTaskCounters_->ChannelWriteBytes += blobInfo.Size ;
152
+ auto opDuration = TInstant::Now () - blobInfo.OpBegin ;
153
+ SpillingTaskCounters_->ChannelWriteTime += opDuration.MilliSeconds ();
154
+ }
129
155
// Complete the future
130
- it-> second .SetValue ();
156
+ blobInfo. SavePromise .SetValue ();
131
157
WritingBlobs_.erase (it);
132
158
133
159
WakeUpCallback_ ();
@@ -143,7 +169,14 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
143
169
return ;
144
170
}
145
171
146
- it->second .SetValue (std::move (msg.Blob ));
172
+ auto & blobInfo = it->second ;
173
+
174
+ if (SpillingTaskCounters_) {
175
+ auto opDuration = TInstant::Now () - blobInfo.OpBegin ;
176
+ SpillingTaskCounters_->ChannelReadTime += opDuration.MilliSeconds ();
177
+ }
178
+
179
+ blobInfo.BlobPromise .SetValue (std::move (msg.Blob ));
147
180
LoadingBlobs_.erase (it);
148
181
149
182
WakeUpCallback_ ();
@@ -163,15 +196,17 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
163
196
private:
164
197
const TTxId TxId_;
165
198
const ui64 ChannelId_;
199
+
166
200
TWakeUpCallback WakeUpCallback_;
167
201
TErrorCallback ErrorCallback_;
202
+ TIntrusivePtr<TSpillingTaskCounters> SpillingTaskCounters_;
168
203
TActorId SpillingActorId_;
169
204
170
- // BlobId -> promise that blob is saved
171
- std::unordered_map<ui64, NThreading::TPromise< void > > WritingBlobs_;
205
+ // BlobId -> blob size + promise that blob is saved
206
+ std::unordered_map<ui64, TWritingBlobInfo > WritingBlobs_;
172
207
173
208
// BlobId -> promise with requested blob
174
- std::unordered_map<ui64, NThreading::TPromise<TBuffer> > LoadingBlobs_;
209
+ std::unordered_map<ui64, TLoadingBlobInfo > LoadingBlobs_;
175
210
176
211
TActorSystem* ActorSystem_;
177
212
};
@@ -181,9 +216,10 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
181
216
IDqChannelStorageActor* CreateDqChannelStorageActor (TTxId txId, ui64 channelId,
182
217
TWakeUpCallback&& wakeUpCallback,
183
218
TErrorCallback&& errorCallback,
219
+ TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters,
184
220
NActors::TActorSystem* actorSystem)
185
221
{
186
- return new TDqChannelStorageActor (txId, channelId, std::move (wakeUpCallback), std::move (errorCallback), actorSystem);
222
+ return new TDqChannelStorageActor (txId, channelId, std::move (wakeUpCallback), std::move (errorCallback), spillingTaskCounters, actorSystem);
187
223
}
188
224
189
225
} // namespace NYql::NDq
0 commit comments