Skip to content

Commit 97824d9

Browse files
authored
File based implementation of QStorage (#4029)
1 parent c62a698 commit 97824d9

13 files changed

+392
-142
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
UNITTEST_FOR(ydb/library/yql/core/qplayer/storage/file)
2+
3+
SRCS(
4+
yql_qstorage_file_ut.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/library/yql/core/qplayer/storage/ut_common
9+
)
10+
11+
END()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#include <ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.h>
2+
3+
#include <library/cpp/testing/unittest/registar.h>
4+
5+
#include <ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h>
6+
7+
using namespace NYql;
8+
9+
Y_UNIT_TEST_SUITE(TQStorageFileTests) {
10+
GENERATE_TESTS(MakeFileQStorage)
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
yql_qstorage_file.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/library/yql/core/qplayer/storage/interface
9+
ydb/library/yql/core/qplayer/storage/memory
10+
)
11+
12+
END()
13+
14+
RECURSE_FOR_TESTS(
15+
ut
16+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
#include "yql_qstorage_file.h"
2+
3+
#include <ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h>
4+
5+
#include <util/folder/tempdir.h>
6+
#include <util/generic/hash_set.h>
7+
#include <util/system/mutex.h>
8+
#include <util/stream/file.h>
9+
10+
namespace NYql {
11+
12+
namespace {
13+
14+
class TWriter : public IQWriter {
15+
public:
16+
TWriter(TFsPath& path)
17+
: Path_(path)
18+
, Storage_(MakeMemoryQStorage())
19+
, Writer_(Storage_->MakeWriter(""))
20+
{
21+
}
22+
23+
NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) final {
24+
return Writer_->Put(key, value);
25+
}
26+
27+
NThreading::TFuture<void> Commit() final {
28+
Writer_->Commit().GetValueSync();
29+
SaveFile(Storage_->MakeIterator("", {}));
30+
return NThreading::MakeFuture();
31+
}
32+
33+
private:
34+
void SaveFile(const IQIteratorPtr& iterator) {
35+
TFileOutput file(Path_.GetPath());
36+
for (;;) {
37+
auto res = iterator->Next().GetValueSync();
38+
if (!res) {
39+
break;
40+
}
41+
42+
SaveString(file, res->Key.Component);
43+
SaveString(file, res->Key.Label);
44+
SaveString(file, res->Value);
45+
}
46+
47+
file.Finish();
48+
}
49+
50+
void SaveString(TFileOutput& file, const TString& str) {
51+
ui32 length = str.Size();
52+
file.Write(&length, sizeof(length));
53+
file.Write(str.Data(), length);
54+
}
55+
56+
private:
57+
const TFsPath Path_;
58+
const IQStoragePtr Storage_;
59+
const IQWriterPtr Writer_;
60+
};
61+
62+
class TStorage : public IQStorage {
63+
public:
64+
TStorage(const TString& folder)
65+
: Folder_(folder)
66+
{
67+
if (!Folder_.IsDefined()) {
68+
TmpDir_.ConstructInPlace();
69+
Folder_ = TmpDir_->Path();
70+
}
71+
}
72+
73+
IQWriterPtr MakeWriter(const TString& operationId) const final {
74+
auto opPath = Folder_ / operationId;
75+
Y_ENSURE(!opPath.Exists());
76+
return std::make_shared<TWriter>(opPath);
77+
}
78+
79+
IQReaderPtr MakeReader(const TString& operationId) const final {
80+
auto memory = MakeMemoryQStorage();
81+
auto opPath = Folder_ / operationId;
82+
if (opPath.Exists()) {
83+
LoadFile(opPath, memory);
84+
}
85+
86+
return memory->MakeReader("");
87+
}
88+
89+
IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const {
90+
auto memory = MakeMemoryQStorage();
91+
auto opPath = Folder_ / operationId;
92+
if (opPath.Exists()) {
93+
LoadFile(opPath, memory);
94+
}
95+
96+
return memory->MakeIterator("", settings);
97+
}
98+
99+
private:
100+
void LoadFile(const TFsPath& path, const IQStoragePtr& memory) const {
101+
auto writer = memory->MakeWriter("");
102+
TFileInput file(path.GetPath());
103+
for (;;) {
104+
TQItemKey key;
105+
if (!LoadString(file, key.Component)) {
106+
break;
107+
}
108+
109+
Y_ENSURE(LoadString(file, key.Label));
110+
TString value;
111+
Y_ENSURE(LoadString(file, value));
112+
writer->Put(key, value).GetValueSync();
113+
}
114+
115+
writer->Commit().GetValueSync();
116+
}
117+
118+
bool LoadString(TFileInput& file, TString& str) const {
119+
ui32 length;
120+
auto loaded = file.Load(&length, sizeof(length));
121+
if (!loaded) {
122+
return false;
123+
}
124+
125+
Y_ENSURE(loaded == sizeof(length));
126+
while (length > 0) {
127+
char buffer[1024];
128+
auto toRead = Min<ui32>(sizeof(buffer), length);
129+
file.LoadOrFail(buffer, toRead);
130+
length -= toRead;
131+
str.append(buffer, toRead);
132+
}
133+
134+
return true;
135+
}
136+
137+
private:
138+
TMaybe<TTempDir> TmpDir_;
139+
TFsPath Folder_;
140+
};
141+
142+
}
143+
144+
IQStoragePtr MakeFileQStorage(const TString& folder) {
145+
return std::make_shared<TStorage>(folder);
146+
}
147+
148+
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#pragma once
2+
#include <ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h>
3+
4+
namespace NYql {
5+
6+
IQStoragePtr MakeFileQStorage(const TString& folder = {});
7+
8+
};

ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class IQWriter {
5050
virtual ~IQWriter() = default;
5151

5252
virtual NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) = 0;
53+
// Commmit should be called at most once, no more Put are allowed after it
5354
virtual NThreading::TFuture<void> Commit() = 0;
5455
};
5556

@@ -75,6 +76,7 @@ class IQStorage {
7576
public:
7677
virtual ~IQStorage() = default;
7778

79+
// it's an UB to open writer twice for the same operationId, implementations may check it
7880
virtual IQWriterPtr MakeWriter(const TString& operationId) const = 0;
7981
// readers & iterators may not see results of writer until commit
8082
virtual IQReaderPtr MakeReader(const TString& operationId) const = 0;

ydb/library/yql/core/qplayer/storage/memory/ut/ya.make

+4
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,9 @@ SRCS(
44
yql_qstorage_memory_ut.cpp
55
)
66

7+
PEERDIR(
8+
ydb/library/yql/core/qplayer/storage/ut_common
9+
)
10+
711
END()
812

ydb/library/yql/core/qplayer/storage/memory/ut/yql_qstorage_memory_ut.cpp

+3-120
Original file line numberDiff line numberDiff line change
@@ -2,127 +2,10 @@
22

33
#include <library/cpp/testing/unittest/registar.h>
44

5-
using namespace NYql;
6-
7-
TVector<TQItem> DrainIterator(IQIterator& iterator) {
8-
TVector<TQItem> res;
9-
for (;;) {
10-
auto value = iterator.Next().GetValueSync();
11-
if (!value) {
12-
break;
13-
}
14-
15-
res.emplace_back(*value);
16-
}
5+
#include <ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h>
176

18-
return res;
19-
}
7+
using namespace NYql;
208

219
Y_UNIT_TEST_SUITE(TQStorageMemoryTests) {
22-
Y_UNIT_TEST(Empty) {
23-
auto storage = MakeMemoryQStorage();
24-
auto reader = storage->MakeReader("foo");
25-
UNIT_ASSERT(!reader->Get({"comp", "label"}).GetValueSync().Defined());
26-
auto iterator = storage->MakeIterator("foo", {});
27-
UNIT_ASSERT(!iterator->Next().GetValueSync().Defined());
28-
}
29-
30-
Y_UNIT_TEST(One) {
31-
auto storage = MakeMemoryQStorage();
32-
auto writer = storage->MakeWriter("foo");
33-
writer->Put({"comp", "label"}, "value").GetValueSync();
34-
writer->Commit().GetValueSync();
35-
auto reader = storage->MakeReader("foo");
36-
auto value = reader->Get({"comp", "label"}).GetValueSync();
37-
UNIT_ASSERT(value.Defined());
38-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
39-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
40-
UNIT_ASSERT_VALUES_EQUAL(value->Value, "value");
41-
auto iterator = storage->MakeIterator("foo", {});
42-
value = iterator->Next().GetValueSync();
43-
UNIT_ASSERT(value.Defined());
44-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
45-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
46-
UNIT_ASSERT_VALUES_EQUAL(value->Value, "value");
47-
value = iterator->Next().GetValueSync();
48-
UNIT_ASSERT(!value.Defined());
49-
}
50-
51-
Y_UNIT_TEST(IterateWithoutValue) {
52-
auto storage = MakeMemoryQStorage();
53-
auto writer = storage->MakeWriter("foo");
54-
writer->Put({"comp", "label"}, "value").GetValueSync();
55-
writer->Commit().GetValueSync();
56-
auto reader = storage->MakeReader("foo");
57-
auto settings = TQIteratorSettings{};
58-
settings.DoNotLoadValue = true;
59-
auto iterator = storage->MakeIterator("foo", settings);
60-
auto value = iterator->Next().GetValueSync();
61-
UNIT_ASSERT(value.Defined());
62-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
63-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
64-
UNIT_ASSERT_VALUES_EQUAL(value->Value, "");
65-
value = iterator->Next().GetValueSync();
66-
UNIT_ASSERT(!value.Defined());
67-
}
68-
69-
Y_UNIT_TEST(ManyKeys) {
70-
const size_t N = 10;
71-
auto storage = MakeMemoryQStorage();
72-
auto writer = storage->MakeWriter("foo");
73-
for (size_t i = 0; i < N; ++i) {
74-
writer->Put({"comp", "label" + ToString(i)}, "value" + ToString(i)).GetValueSync();
75-
}
76-
77-
writer->Commit().GetValueSync();
78-
auto reader = storage->MakeReader("foo");
79-
for (size_t i = 0; i < N; ++i) {
80-
auto value = reader->Get({"comp", "label" + ToString(i)}).GetValueSync();
81-
UNIT_ASSERT(value.Defined());
82-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
83-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label" + ToString(i));
84-
UNIT_ASSERT_VALUES_EQUAL(value->Value, "value" + ToString(i));
85-
}
86-
87-
auto iterator = storage->MakeIterator("foo", {});
88-
TVector<TQItem> res = DrainIterator(*iterator);
89-
UNIT_ASSERT_VALUES_EQUAL(res.size(), N);
90-
Sort(res);
91-
for (size_t i = 0; i < N; ++i) {
92-
UNIT_ASSERT_VALUES_EQUAL(res[i].Key.Component, "comp");
93-
UNIT_ASSERT_VALUES_EQUAL(res[i].Key.Label, "label" + ToString(i));
94-
UNIT_ASSERT_VALUES_EQUAL(res[i].Value, "value" + ToString(i));
95-
}
96-
}
97-
98-
Y_UNIT_TEST(InterleaveReadWrite) {
99-
auto storage = MakeMemoryQStorage();
100-
auto reader = storage->MakeReader("foo");
101-
auto value = reader->Get({"comp", "label"}).GetValueSync();
102-
UNIT_ASSERT(!value.Defined());
103-
auto iterator1 = storage->MakeIterator("foo", {});
104-
value = iterator1->Next().GetValueSync();
105-
UNIT_ASSERT(!value.Defined());
106-
auto writer = storage->MakeWriter("foo");
107-
writer->Put({"comp", "label"}, "value").GetValueSync();
108-
value = reader->Get({"comp", "label"}).GetValueSync();
109-
UNIT_ASSERT(!value.Defined());
110-
auto iterator2 = storage->MakeIterator("foo", {});
111-
value = iterator2->Next().GetValueSync();
112-
UNIT_ASSERT(!value.Defined());
113-
writer->Commit().GetValueSync();
114-
value = reader->Get({"comp", "label"}).GetValueSync();
115-
UNIT_ASSERT(value.Defined());
116-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
117-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
118-
UNIT_ASSERT_VALUES_EQUAL(value->Value, "value");
119-
auto iterator3 = storage->MakeIterator("foo", {});
120-
value = iterator3->Next().GetValueSync();
121-
UNIT_ASSERT(value.Defined());
122-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
123-
UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
124-
UNIT_ASSERT_VALUES_EQUAL(value->Value, "value");
125-
value = iterator2->Next().GetValueSync();
126-
UNIT_ASSERT(!value.Defined());
127-
}
10+
GENERATE_TESTS(MakeMemoryQStorage)
12811
}

0 commit comments

Comments
 (0)