Skip to content

Commit eee3fb5

Browse files
authored
[yql] Refresh file storage state at fork (YQL-17461) (#878)
* [yql] Refresh file storage state at fork (YQL-17461) * Add `const` to NeedToCleanup()
1 parent 5232e81 commit eee3fb5

File tree

1 file changed

+28
-19
lines changed

1 file changed

+28
-19
lines changed

ydb/library/yql/core/file_storage/storage.cpp

+28-19
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <util/system/thread.h>
2222

2323
#include <functional>
24+
#include <atomic>
2425

2526
#if defined(_unix_)
2627
#include <pthread.h>
@@ -135,7 +136,7 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
135136
private:
136137
void Reinit() {
137138
for (auto& v : Registered) {
138-
v.ResetRandom();
139+
v.ResetAtFork();
139140
}
140141
}
141142

@@ -153,6 +154,9 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
153154
, IsTemp(storagePath.empty())
154155
, MaxFiles(maxFiles)
155156
, MaxSize(maxSize)
157+
, CurrentFiles(0)
158+
, CurrentSize(0)
159+
, Dirty(false)
156160
{
157161
// TFsPath is not thread safe. It can initialize internal Split at any time. Force do it right now
158162
StorageDir.PathSplit();
@@ -172,8 +176,8 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
172176
TAtforkReinit::Get().Register(this);
173177
YQL_LOG(INFO) << "FileStorage initialized in " << StorageDir.GetPath().Quote()
174178
<< ", temporary dir: " << ProcessTempDir.GetPath().Quote()
175-
<< ", files: " << CurrentFiles
176-
<< ", total size: " << CurrentSize;
179+
<< ", files: " << CurrentFiles.load()
180+
<< ", total size: " << CurrentSize.load();
177181
}
178182

179183
~TImpl() {
@@ -219,8 +223,8 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
219223
SetCacheFilePermissionsNoThrow(hardlinkFile);
220224

221225
if (NFs::HardLink(hardlinkFile, storageFile)) {
222-
AtomicIncrement(CurrentFiles);
223-
AtomicAdd(CurrentSize, fileSize);
226+
++CurrentFiles;
227+
CurrentSize += fileSize;
224228
}
225229
// Ignore HardLink fail. Another process managed to download before us
226230
TouchFile(storageFile.c_str());
@@ -281,10 +285,10 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
281285
const i64 newFileSize = Max<i64>(0, GetFileLength(dstStorageFile.c_str()));
282286

283287
if (!prevFileExisted) {
284-
AtomicIncrement(CurrentFiles);
288+
++CurrentFiles;
285289
}
286290

287-
AtomicAdd(CurrentSize, newFileSize - prevFileSize);
291+
CurrentSize += newFileSize - prevFileSize;
288292
}
289293

290294
bool RemoveFromStorage(const TString& existingStorageFileName) {
@@ -300,19 +304,19 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
300304
const bool result = NFs::Remove(storageFile);
301305

302306
if (result || !storageFile.Exists()) {
303-
AtomicDecrement(CurrentFiles);
304-
AtomicAdd(CurrentSize, -prevFileSize);
307+
++CurrentFiles;
308+
CurrentSize -= prevFileSize;
305309
}
306310

307311
return result;
308312
}
309313

310314
ui64 GetOccupiedSize() const {
311-
return AtomicGet(CurrentSize);
315+
return CurrentSize.load();
312316
}
313317

314318
size_t GetCount() const {
315-
return AtomicGet(CurrentFiles);
319+
return CurrentFiles.load();
316320
}
317321

318322
TString GetTempName() {
@@ -365,15 +369,17 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
365369
CurrentSize = actualSize;
366370
}
367371

368-
bool NeedToCleanup() {
369-
return static_cast<ui64>(AtomicGet(CurrentFiles)) > MaxFiles ||
370-
static_cast<ui64>(AtomicGet(CurrentSize)) > MaxSize;
372+
bool NeedToCleanup() const {
373+
return Dirty.load()
374+
|| static_cast<ui64>(CurrentFiles.load()) > MaxFiles
375+
|| static_cast<ui64>(CurrentSize.load()) > MaxSize;
371376
}
372377

373378
void Cleanup() {
374379
if (!NeedToCleanup()) {
375380
return;
376381
}
382+
Dirty.store(false);
377383

378384
with_lock (CleanupLock) {
379385
TVector<TString> names;
@@ -422,15 +428,17 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
422428
}
423429
}
424430

425-
AtomicSet(CurrentFiles, actualFiles);
426-
AtomicSet(CurrentSize, actualSize);
431+
CurrentFiles.store(actualFiles);
432+
CurrentSize.store(actualSize);
427433
}
428434
}
429435

430-
void ResetRandom() {
436+
void ResetAtFork() {
431437
with_lock(RndLock) {
432438
Rnd.ResetSeed();
433439
}
440+
// Force cleanup on next file add, because other processes may change the state
441+
Dirty.store(true);
434442
}
435443

436444
private:
@@ -441,8 +449,9 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
441449
const bool IsTemp;
442450
const ui64 MaxFiles;
443451
const ui64 MaxSize;
444-
TAtomic CurrentFiles = 0;
445-
TAtomic CurrentSize = 0;
452+
std::atomic<i64> CurrentFiles = 0;
453+
std::atomic<i64> CurrentSize = 0;
454+
std::atomic_bool Dirty;
446455
TMutex RndLock;
447456
TRandGuid Rnd;
448457
};

0 commit comments

Comments
 (0)