13
13
#include < util/folder/path.h>
14
14
#include < util/stream/file.h>
15
15
#include < util/thread/pool.h>
16
+ #include < util/generic/guid.h>
17
+ #include < util/folder/iterator.h>
18
+ #include < util/generic/vector.h>
19
+ #include < util/folder/dirut.h>
20
+ #include < util/system/user.h>
16
21
17
22
namespace NYql ::NDq {
18
23
@@ -159,6 +164,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
159
164
EvCloseFileResponse = TEvDqSpillingLocalFile::EEv::LastEvent + 1 ,
160
165
EvWriteFileResponse,
161
166
EvReadFileResponse,
167
+ EvRemoveOldTmp,
162
168
163
169
LastEvent
164
170
};
@@ -189,6 +195,15 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
189
195
bool Removed = false ;
190
196
TMaybe<TString> Error;
191
197
};
198
+
199
+ struct TEvRemoveOldTmp : public TEventLocal <TEvRemoveOldTmp, EvRemoveOldTmp> {
200
+ TFsPath TmpRoot;
201
+ ui32 NodeId;
202
+ TString SpillingSessionId;
203
+
204
+ TEvRemoveOldTmp (TFsPath tmpRoot, ui32 nodeId, TString spillingSessionId)
205
+ : TmpRoot(std::move(tmpRoot)), NodeId(nodeId), SpillingSessionId(std::move(spillingSessionId)) {}
206
+ };
192
207
};
193
208
194
209
struct TFileDesc ;
@@ -206,8 +221,11 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
206
221
207
222
void Bootstrap () {
208
223
Root_ = Config_.Root ;
209
- Root_ /= (TStringBuilder () << " node_" << SelfId ().NodeId ());
224
+ const auto rootToRemoveOldTmp = Root_;
225
+ const auto sessionId = Config_.SpillingSessionId ;
226
+ const auto nodeId = SelfId ().NodeId ();
210
227
228
+ Root_ /= (TStringBuilder () << NodePrefix_ << " _" << nodeId << " _" << sessionId);
211
229
LOG_I (" Init DQ local file spilling service at " << Root_ << " , actor: " << SelfId ());
212
230
213
231
try {
@@ -221,6 +239,8 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
221
239
Become (&TDqLocalFileSpillingService::BrokenState);
222
240
return ;
223
241
}
242
+
243
+ Send (SelfId (), MakeHolder<TEvPrivate::TEvRemoveOldTmp>(rootToRemoveOldTmp, nodeId, sessionId));
224
244
225
245
Become (&TDqLocalFileSpillingService::WorkState);
226
246
}
@@ -271,6 +291,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
271
291
hFunc(TEvPrivate::TEvWriteFileResponse, HandleWork)
272
292
hFunc(TEvDqSpilling::TEvRead, HandleWork)
273
293
hFunc(TEvPrivate::TEvReadFileResponse, HandleWork)
294
+ hFunc(TEvPrivate::TEvRemoveOldTmp, HandleWork)
274
295
hFunc(NMon::TEvHttpInfo, HandleWork)
275
296
cFunc(TEvents::TEvPoison::EventType, PassAway)
276
297
);
@@ -712,6 +733,50 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
712
733
Send (ev->Sender , new NMon::TEvHttpInfoRes (s.Str ()));
713
734
}
714
735
736
+ void HandleWork (TEvPrivate::TEvRemoveOldTmp::TPtr& ev) {
737
+ const auto & msg = *ev->Get ();
738
+ const auto & root = msg.TmpRoot ;
739
+ const auto nodeIdString = ToString (msg.NodeId );
740
+ const auto & sessionId = msg.SpillingSessionId ;
741
+ const auto & nodePrefix = this ->NodePrefix_ ;
742
+
743
+ LOG_I (" [RemoveOldTmp] removing at root: " << root);
744
+
745
+ const auto isDirOldTmp = [&nodePrefix, &nodeIdString, &sessionId](const TString& dirName) -> bool {
746
+ // dirName: node_<nodeId>_<sessionId>
747
+ TVector<TString> parts;
748
+ StringSplitter (dirName).Split (' _' ).Limit (3 ).Collect (&parts);
749
+
750
+ if (parts.size () < 3 ) {
751
+ return false ;
752
+ }
753
+ return parts[0 ] == nodePrefix && parts[1 ] == nodeIdString && parts[2 ] != sessionId;
754
+ };
755
+
756
+ try {
757
+ TDirIterator iter (root, TDirIterator::TOptions ().SetMaxLevel (1 ));
758
+
759
+ TVector<TString> oldTmps;
760
+ for (const auto & dirEntry : iter) {
761
+ if (dirEntry.fts_info == FTS_DP) {
762
+ continue ;
763
+ }
764
+
765
+ const auto dirName = dirEntry.fts_name ;
766
+ if (isDirOldTmp (dirName)) {
767
+ LOG_D (" [RemoveOldTmp] found old temporary at " << (root / dirName));
768
+ oldTmps.emplace_back (std::move (dirName));
769
+ }
770
+ }
771
+
772
+ for (const auto & dirName : oldTmps) {
773
+ (root / dirName).ForceDelete ();
774
+ }
775
+ } catch (const yexception& e) {
776
+ LOG_E (" [RemoveOldTmp] removing failed due to: " << e.what ());
777
+ }
778
+ }
779
+
715
780
private:
716
781
void RunOp (TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) {
717
782
if (fd.HasActiveOp ) {
@@ -941,6 +1006,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
941
1006
942
1007
private:
943
1008
const TFileSpillingServiceConfig Config_;
1009
+ const TString NodePrefix_ = " node" ;
944
1010
TFsPath Root_;
945
1011
TIntrusivePtr<TSpillingCounters> Counters_;
946
1012
@@ -952,6 +1018,12 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
952
1018
953
1019
} // anonymous namespace
954
1020
1021
+ TFsPath GetTmpSpillingRootForCurrentUser () {
1022
+ auto root = TFsPath{GetSystemTempDir ()};
1023
+ root /= " spilling-tmp-" + GetUsername ();
1024
+ return root;
1025
+ }
1026
+
955
1027
IActor* CreateDqLocalFileSpillingActor (TTxId txId, const TString& details, const TActorId& client,
956
1028
bool removeBlobsAfterRead)
957
1029
{
0 commit comments