Skip to content

Commit 52827d2

Browse files
Merge eb285a6 into 11954b0
2 parents 11954b0 + eb285a6 commit 52827d2

File tree

9 files changed

+503
-0
lines changed

9 files changed

+503
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#include "compute_storage.h"
2+
3+
#include "compute_storage_actor.h"
4+
5+
#include <ydb/library/yql/utils/yql_panic.h>
6+
#include <ydb/library/services/services.pb.h>
7+
8+
#include <ydb/library/actors/core/actor_bootstrapped.h>
9+
#include <ydb/library/actors/core/hfunc.h>
10+
#include <ydb/library/actors/core/log.h>
11+
12+
#include <util/generic/buffer.h>
13+
#include <util/generic/map.h>
14+
#include <util/generic/set.h>
15+
16+
17+
namespace NYql::NDq {
18+
19+
using namespace NActors;
20+
21+
class TDqComputeStorage : public IDqComputeStorage {
22+
public:
23+
TDqComputeStorage(TTxId txId, const TString& spillerName, std::function<void()>&& wakeUpCallback) {
24+
25+
ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName, std::move(wakeUpCallback));
26+
ComputeStorageActorId_ = TlsActivationContext->AsActorContext().Register(ComputeStorageActor_->GetActor());
27+
}
28+
29+
~TDqComputeStorage() {
30+
TlsActivationContext->AsActorContext().Send(ComputeStorageActorId_, new TEvents::TEvPoison);
31+
}
32+
33+
NThreading::TFuture<TKey> Put(TRope&& blob) override {
34+
return ComputeStorageActor_->Put(std::move(blob));
35+
}
36+
37+
std::optional<NThreading::TFuture<TRope>> Get(TKey key) override {
38+
return ComputeStorageActor_->Get(key);
39+
}
40+
41+
NThreading::TFuture<void> Delete(TKey key) override {
42+
return ComputeStorageActor_->Delete(key);
43+
}
44+
45+
std::optional<NThreading::TFuture<TRope>> Extract(TKey key) override {
46+
return ComputeStorageActor_->Extract(key);
47+
}
48+
49+
private:
50+
IDqComputeStorageActor* ComputeStorageActor_;
51+
TActorId ComputeStorageActorId_;
52+
};
53+
54+
IDqComputeStorage::TPtr MakeComputeStorage(const TString& spillerName, std::function<void()>&& wakeUpCallback) {
55+
return std::make_shared<TDqComputeStorage>(TTxId(), spillerName, std::move(wakeUpCallback));
56+
}
57+
58+
} // namespace NYql::NDq
59+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#pragma once
2+
3+
#include <ydb/library/yql/dq/common/dq_common.h>
4+
#include <ydb/library/yql/minikql/computation/mkql_spiller.h>
5+
#include <ydb/library/actors/core/actor.h>
6+
7+
namespace NActors {
8+
class TActorSystem;
9+
};
10+
11+
namespace NYql::NDq {
12+
13+
// This class will be refactored to be non-actor spiller part
14+
class IDqComputeStorage
15+
{
16+
public:
17+
using TPtr = std::shared_ptr<IDqComputeStorage>;
18+
using TKey = ui64;
19+
20+
virtual ~IDqComputeStorage() = default;
21+
22+
virtual NThreading::TFuture<TKey> Put(TRope&& blob) = 0;
23+
24+
virtual std::optional<NThreading::TFuture<TRope>> Get(TKey key) = 0;
25+
26+
virtual std::optional<NThreading::TFuture<TRope>> Extract(TKey key) = 0;
27+
28+
virtual NThreading::TFuture<void> Delete(TKey key) = 0;
29+
};
30+
31+
IDqComputeStorage::TPtr MakeComputeStorage(const TString& spillerName, std::function<void()>&& wakeUpCallback);
32+
33+
} // namespace NYql::NDq

0 commit comments

Comments
 (0)