@@ -82,7 +82,6 @@ namespace NOps {
82
82
enum EEv {
83
83
EvLoadBlob = EventSpaceBegin (TKikimrEvents::ES_PRIVATE),
84
84
EvBlobLoaded,
85
- EvLoadPages,
86
85
EvPartLoaded,
87
86
EvPartFailed,
88
87
};
@@ -107,14 +106,6 @@ namespace NOps {
107
106
{ }
108
107
};
109
108
110
- struct TEvLoadPages : public TEventLocal <TEvLoadPages, EvLoadPages> {
111
- TAutoPtr<NPageCollection::TFetch> Request;
112
-
113
- TEvLoadPages (TAutoPtr<NPageCollection::TFetch> request)
114
- : Request(std::move(request))
115
- { }
116
- };
117
-
118
109
struct TEvPartLoaded : public TEventLocal <TEvPartLoaded, EvPartLoaded> {
119
110
TPartView Part;
120
111
@@ -135,9 +126,10 @@ namespace NOps {
135
126
private:
136
127
class TColdPartLoader : public ::NActors::TActorBootstrapped<TColdPartLoader> {
137
128
public:
138
- TColdPartLoader (TActorId owner, TIntrusiveConstPtr<TColdPartStore> part)
129
+ TColdPartLoader (TActorId owner, TIntrusiveConstPtr<TColdPartStore> part, EPriority readPriority )
139
130
: Owner(owner)
140
131
, Part(std::move(part))
132
+ , ReadPriority(readPriority)
141
133
{ }
142
134
143
135
void Bootstrap () {
@@ -195,7 +187,7 @@ namespace NOps {
195
187
196
188
void RunLoader () {
197
189
for (auto req : Loader->Run ({.PreloadIndex = false , .PreloadData = false })) {
198
- Send (Owner , new TEvPrivate::TEvLoadPages ( std::move ( req) ));
190
+ Send (MakeSharedPageCacheId () , new NSharedCache ::TEvRequest (ReadPriority, req));
199
191
++ReadsLeft;
200
192
}
201
193
@@ -209,6 +201,7 @@ namespace NOps {
209
201
STRICT_STFUNC (StateLoadPart, {
210
202
sFunc (TEvents::TEvPoison, PassAway);
211
203
hFunc (NSharedCache::TEvResult, Handle );
204
+ hFunc (NBlockIO::TEvStat, Handle );
212
205
});
213
206
214
207
void Handle (NSharedCache::TEvResult::TPtr& ev) {
@@ -229,9 +222,20 @@ namespace NOps {
229
222
}
230
223
}
231
224
225
+ void Handle (NBlockIO::TEvStat::TPtr& ev) {
226
+ ev->Rewrite (ev->GetTypeRewrite (), Owner);
227
+ TActivationContext::Send (ev.Release ());
228
+ }
229
+
230
+ void PassAway () override {
231
+ Send (MakeSharedPageCacheId (), new NSharedCache::TEvUnregister);
232
+ TActorBootstrapped::PassAway ();
233
+ }
234
+
232
235
private:
233
236
TActorId Owner;
234
237
TIntrusiveConstPtr<TColdPartStore> Part;
238
+ EPriority ReadPriority;
235
239
TVector<TIntrusivePtr<TPrivatePageCache::TInfo>> PageCollections;
236
240
TVector<NPageCollection::TLargeGlobIdRestoreState> PageCollectionLoaders;
237
241
size_t PageCollectionsLeft = 0 ;
@@ -306,7 +310,7 @@ namespace NOps {
306
310
// Create a loader for this new part
307
311
TIntrusiveConstPtr<TColdPartStore> partStore = dynamic_cast <TColdPartStore*>(const_cast <TColdPart*>(part.Get ()));
308
312
Y_VERIFY_S (partStore, " Cannot load unsupported part " << NFmt::Do (*part));
309
- ColdPartLoaders[label] = RegisterWithSameMailbox (new TColdPartLoader (SelfId (), std::move (partStore)));
313
+ ColdPartLoaders[label] = RegisterWithSameMailbox (new TColdPartLoader (SelfId (), std::move (partStore), Args. ReadPrio ));
310
314
}
311
315
312
316
// Return empty TPartView to signal loader is still in progress
@@ -359,7 +363,6 @@ namespace NOps {
359
363
hFunc (TEvContinue, Handle );
360
364
hFunc (TEvPrivate::TEvLoadBlob, Handle );
361
365
hFunc (TEvBlobStorage::TEvGetResult, Handle );
362
- hFunc (TEvPrivate::TEvLoadPages, Handle );
363
366
hFunc (NBlockIO::TEvStat, Handle );
364
367
hFunc (TEvPrivate::TEvPartLoaded, Handle );
365
368
hFunc (TEvPrivate::TEvPartFailed, Handle );
@@ -484,7 +487,7 @@ namespace NOps {
484
487
if (auto logl = Logger->Log (ELnLev::Debug))
485
488
logl << NFmt::Do (*this ) << " " << NFmt::Do (*req);
486
489
487
- Send (MakeSharedPageCacheId (), new NSharedCache::TEvRequest (Args.ReadPrio , req, SelfId () ));
490
+ Send (MakeSharedPageCacheId (), new NSharedCache::TEvRequest (Args.ReadPrio , req));
488
491
}
489
492
490
493
if (ready == NTable::EReady::Page)
@@ -560,16 +563,6 @@ namespace NOps {
560
563
}
561
564
}
562
565
563
- void Handle (TEvPrivate::TEvLoadPages::TPtr& ev)
564
- {
565
- auto * msg = ev->Get ();
566
-
567
- TActorIdentity (ev->Sender ).Send (
568
- MakeSharedPageCacheId (),
569
- new NSharedCache::TEvRequest (Args.ReadPrio , std::move (msg->Request ), SelfId ()),
570
- ev->Flags , ev->Cookie );
571
- }
572
-
573
566
void Handle (NBlockIO::TEvStat::TPtr& ev)
574
567
{
575
568
ev->Rewrite (ev->GetTypeRewrite (), Owner);
0 commit comments