Skip to content

Commit 6f6c9dd

Browse files
Merge 6921e8b into 4fef0c2
2 parents 4fef0c2 + 6921e8b commit 6f6c9dd

File tree

9 files changed

+481
-0
lines changed

9 files changed

+481
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#include "compute_storage.h"
2+
3+
namespace NYql::NDq {
4+
5+
using namespace NActors;
6+
7+
TDqComputeStorage::TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, TActorSystem* actorSystem)
8+
: ActorSystem_(actorSystem)
9+
{
10+
TStringStream spillerName;
11+
spillerName << "Spiller" << "_" << static_cast<const void*>(this);
12+
ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback);
13+
ComputeStorageActorId_ = ActorSystem_->Register(ComputeStorageActor_->GetActor());
14+
}
15+
16+
TDqComputeStorage::~TDqComputeStorage() {
17+
ActorSystem_->Send(ComputeStorageActorId_, new TEvents::TEvPoison);
18+
}
19+
20+
NThreading::TFuture<NKikimr::NMiniKQL::ISpiller::TKey> TDqComputeStorage::Put(TRope&& blob) {
21+
return ComputeStorageActor_->Put(std::move(blob));
22+
}
23+
24+
std::optional<NThreading::TFuture<TRope>> TDqComputeStorage::Get(TKey key) {
25+
return ComputeStorageActor_->Get(key);
26+
}
27+
28+
NThreading::TFuture<void> TDqComputeStorage::Delete(TKey key) {
29+
return ComputeStorageActor_->Delete(key);
30+
}
31+
32+
std::optional<NThreading::TFuture<TRope>> TDqComputeStorage::Extract(TKey key) {
33+
return ComputeStorageActor_->Extract(key);
34+
}
35+
} // namespace NYql::NDq
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#pragma once
2+
3+
#include "compute_storage_actor.h"
4+
5+
#include <ydb/library/yql/dq/common/dq_common.h>
6+
#include <ydb/library/yql/minikql/computation/mkql_spiller.h>
7+
#include <ydb/library/actors/core/actor.h>
8+
9+
namespace NActors {
10+
class TActorSystem;
11+
};
12+
13+
namespace NYql::NDq {
14+
15+
// This class will be refactored to be non-actor spiller part
16+
class TDqComputeStorage : public NKikimr::NMiniKQL::ISpiller
17+
{
18+
public:
19+
20+
TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, NActors::TActorSystem* actorSystem);
21+
22+
~TDqComputeStorage();
23+
24+
NThreading::TFuture<TKey> Put(TRope&& blob);
25+
26+
std::optional<NThreading::TFuture<TRope>> Get(TKey key);
27+
28+
std::optional<NThreading::TFuture<TRope>> Extract(TKey key);
29+
30+
NThreading::TFuture<void> Delete(TKey key);
31+
32+
private:
33+
NActors::TActorSystem* ActorSystem_;
34+
IDqComputeStorageActor* ComputeStorageActor_;
35+
NActors::TActorId ComputeStorageActorId_;
36+
};
37+
38+
} // namespace NYql::NDq
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
#include "compute_storage_actor.h"
2+
3+
#include "spilling.h"
4+
#include "spilling_file.h"
5+
6+
#include <ydb/library/services/services.pb.h>
7+
8+
#include <ydb/library/actors/core/actor_bootstrapped.h>
9+
#include <ydb/library/actors/core/hfunc.h>
10+
#include <ydb/library/actors/core/log.h>
11+
12+
13+
namespace NYql::NDq {
14+
15+
using namespace NActors;
16+
17+
namespace {
18+
19+
#define LOG_D(s) \
20+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ". " << s)
21+
#define LOG_I(s) \
22+
LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ". " << s)
23+
#define LOG_E(s) \
24+
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ". " << s)
25+
#define LOG_C(s) \
26+
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ". " << s)
27+
#define LOG_W(s) \
28+
LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ". " << s)
29+
#define LOG_T(s) \
30+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ". " << s)
31+
32+
class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStorageActor>,
33+
public IDqComputeStorageActor
34+
{
35+
using TBase = TActorBootstrapped<TDqComputeStorageActor>;
36+
// size + promise with key
37+
using TWritingBlobInfo = std::pair<ui64, NThreading::TPromise<IDqComputeStorageActor::TKey>>;
38+
// remove after read + promise with blob
39+
using TLoadingBlobInfo = std::pair<bool, NThreading::TPromise<TRope>>;
40+
// void promise that completes when block is removed
41+
using TDeletingBlobInfo = NThreading::TPromise<void>;
42+
public:
43+
TDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback)
44+
: TxId_(txId),
45+
SpillerName_(spillerName),
46+
WakeupCallback_(wakeupCallback)
47+
{
48+
}
49+
50+
void Bootstrap() {
51+
Become(&TDqComputeStorageActor::WorkState);
52+
}
53+
54+
static constexpr char ActorName[] = "DQ_COMPUTE_STORAGE";
55+
56+
IActor* GetActor() override {
57+
return this;
58+
}
59+
60+
NThreading::TFuture<IDqComputeStorageActor::TKey> Put(TRope&& blob) override {
61+
InitializeIfNot();
62+
// Use lock to prevent race when state is changed on event processing and on Put call
63+
std::lock_guard lock(Mutex_);
64+
65+
FailOnError();
66+
67+
ui64 size = blob.size();
68+
69+
Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(NextBlobId, std::move(blob)));
70+
71+
auto it = WritingBlobs_.emplace(NextBlobId, std::make_pair(size, NThreading::NewPromise<IDqComputeStorageActor::TKey>())).first;
72+
WritingBlobsSize_ += size;
73+
74+
++NextBlobId;
75+
76+
auto& promise = it->second.second;
77+
78+
return promise.GetFuture();
79+
}
80+
81+
std::optional<NThreading::TFuture<TRope>> Get(IDqComputeStorageActor::TKey blobId) override {
82+
return GetInternal(blobId, false);
83+
}
84+
85+
std::optional<NThreading::TFuture<TRope>> Extract(IDqComputeStorageActor::TKey blobId) override {
86+
return GetInternal(blobId, true);
87+
}
88+
89+
NThreading::TFuture<void> Delete(IDqComputeStorageActor::TKey blobId) override {
90+
InitializeIfNot();
91+
// Use lock to prevent race when state is changed on event processing and on Delete call
92+
std::lock_guard lock(Mutex_);
93+
94+
FailOnError();
95+
96+
auto promise = NThreading::NewPromise<void>();
97+
auto future = promise.GetFuture();
98+
99+
if (!StoredBlobs_.contains(blobId)) {
100+
promise.SetValue();
101+
return future;
102+
}
103+
104+
DeletingBlobs_.emplace(blobId, std::move(promise));
105+
106+
Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, true));
107+
108+
return future;
109+
}
110+
111+
protected:
112+
std::optional<NThreading::TFuture<TRope>>GetInternal(IDqComputeStorageActor::TKey blobId, bool removeAfterRead) {
113+
InitializeIfNot();
114+
// Use lock to prevent race when state is changed on event processing and on Get call
115+
std::lock_guard lock(Mutex_);
116+
117+
FailOnError();
118+
119+
if (!StoredBlobs_.contains(blobId)) return std::nullopt;
120+
121+
TLoadingBlobInfo loadingblobInfo = std::make_pair(removeAfterRead, NThreading::NewPromise<TRope>());
122+
auto it = LoadingBlobs_.emplace(blobId, std::move(loadingblobInfo)).first;
123+
124+
Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, false));
125+
126+
auto& promise = it->second.second;
127+
return promise.GetFuture();
128+
}
129+
130+
void PassAway() override {
131+
InitializeIfNot();
132+
Send(SpillingActorId_, new TEvents::TEvPoison);
133+
TBase::PassAway();
134+
}
135+
136+
void FailOnError() {
137+
InitializeIfNot();
138+
if (Error_) {
139+
LOG_E("Error: " << *Error_);
140+
Send(SpillingActorId_, new TEvents::TEvPoison);
141+
}
142+
}
143+
144+
private:
145+
STATEFN(WorkState) {
146+
switch (ev->GetTypeRewrite()) {
147+
hFunc(TEvDqSpilling::TEvWriteResult, HandleWork);
148+
hFunc(TEvDqSpilling::TEvReadResult, HandleWork);
149+
hFunc(TEvDqSpilling::TEvError, HandleWork);
150+
cFunc(TEvents::TEvPoison::EventType, PassAway);
151+
default:
152+
Y_ABORT("TDqComputeStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s",
153+
ev->GetTypeRewrite(),
154+
ev->ToString().data());
155+
}
156+
}
157+
158+
void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
159+
auto& msg = *ev->Get();
160+
LOG_T("[TEvWriteResult] blobId: " << msg.BlobId);
161+
162+
// Use lock to prevent race when state is changed on event processing and on Put call
163+
std::lock_guard lock(Mutex_);
164+
165+
auto it = WritingBlobs_.find(msg.BlobId);
166+
if (it == WritingBlobs_.end()) {
167+
LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId);
168+
169+
Error_ = "Internal error";
170+
171+
Send(SpillingActorId_, new TEvents::TEvPoison);
172+
return;
173+
}
174+
175+
auto& [size, promise] = it->second;
176+
177+
WritingBlobsSize_ -= size;
178+
179+
StoredBlobsCount_++;
180+
StoredBlobsSize_ += size;
181+
182+
StoredBlobs_.insert(msg.BlobId);
183+
184+
// complete future and wake up waiting compute node
185+
promise.SetValue(msg.BlobId);
186+
187+
WritingBlobs_.erase(it);
188+
WakeupCallback_();
189+
}
190+
191+
void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) {
192+
auto& msg = *ev->Get();
193+
LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size());
194+
195+
// Use lock to prevent race when state is changed on event processing and on Put call
196+
std::lock_guard lock(Mutex_);
197+
198+
// Deletion is read without fetching the results. So, after the deletion library sends TEvReadResult event
199+
// Check if the intention was to delete and complete correct future in this case.
200+
if (HandleDelete(msg.BlobId, msg.Blob.Size())) {
201+
WakeupCallback_();
202+
return;
203+
}
204+
205+
auto it = LoadingBlobs_.find(msg.BlobId);
206+
if (it == LoadingBlobs_.end()) {
207+
LOG_E("Got unexpected TEvReadResult, blobId: " << msg.BlobId);
208+
209+
Error_ = "Internal error";
210+
211+
Send(SpillingActorId_, new TEvents::TEvPoison);
212+
return;
213+
}
214+
215+
bool removedAfterRead = it->second.first;
216+
if (removedAfterRead) {
217+
UpdateStatsAfterBlobDeletion(msg.BlobId, msg.Blob.Size());
218+
}
219+
220+
TRope res(TString(reinterpret_cast<const char*>(msg.Blob.Data()), msg.Blob.Size()));
221+
222+
auto& promise = it->second.second;
223+
promise.SetValue(std::move(res));
224+
225+
LoadingBlobs_.erase(it);
226+
227+
WakeupCallback_();
228+
}
229+
230+
void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) {
231+
auto& msg = *ev->Get();
232+
LOG_D("[TEvError] " << msg.Message);
233+
234+
Error_.ConstructInPlace(msg.Message);
235+
}
236+
237+
bool HandleDelete(IDqComputeStorageActor::TKey blobId, ui64 size) {
238+
auto it = DeletingBlobs_.find(blobId);
239+
if (it == DeletingBlobs_.end()) {
240+
return false;
241+
}
242+
243+
UpdateStatsAfterBlobDeletion(blobId, size);
244+
245+
auto& promise = it->second;
246+
promise.SetValue();
247+
DeletingBlobs_.erase(it);
248+
return true;
249+
}
250+
251+
void UpdateStatsAfterBlobDeletion(IDqComputeStorageActor::TKey blobId, ui64 size) {
252+
StoredBlobsCount_--;
253+
StoredBlobsSize_ -= size;
254+
StoredBlobs_.erase(blobId);
255+
}
256+
257+
// It's illegal to initialize an inner actor in the actor's ctor. Because in this case ctx will not be initialized because it's initialized afger Bootstrap event.
258+
// But also it's not possible to initialize inner actor in the bootstrap function because in this case Put/Get may be called before the Bootstrap -> inner worker will be uninitialized.
259+
// In current implementation it's still possible to leave inner actor uninitialized that is why it's planned to split this class into Actor part + non actor part
260+
void InitializeIfNot() {
261+
if (IsInitialized_) return;
262+
auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, SpillerName_,
263+
SelfId(), false);
264+
SpillingActorId_ = Register(spillingActor);
265+
266+
IsInitialized_ = true;
267+
}
268+
269+
270+
protected:
271+
const TTxId TxId_;
272+
TActorId SpillingActorId_;
273+
274+
TMap<IDqComputeStorageActor::TKey, TWritingBlobInfo> WritingBlobs_;
275+
TSet<ui64> StoredBlobs_;
276+
ui64 WritingBlobsSize_ = 0;
277+
278+
ui32 StoredBlobsCount_ = 0;
279+
ui64 StoredBlobsSize_ = 0;
280+
281+
TMap<IDqComputeStorageActor::TKey, TLoadingBlobInfo> LoadingBlobs_;
282+
283+
TMap<IDqComputeStorageActor::TKey, TDeletingBlobInfo> DeletingBlobs_;
284+
285+
TMaybe<TString> Error_;
286+
287+
IDqComputeStorageActor::TKey NextBlobId = 0;
288+
289+
TString SpillerName_;
290+
291+
bool IsInitialized_ = false;
292+
293+
std::function<void()> WakeupCallback_;
294+
295+
private:
296+
std::mutex Mutex_;
297+
298+
};
299+
300+
} // anonymous namespace
301+
302+
IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback) {
303+
return new TDqComputeStorageActor(txId, spillerName, wakeupCallback);
304+
}
305+
306+
} // namespace NYql::NDq

0 commit comments

Comments
 (0)