@@ -20,62 +20,108 @@ using namespace NActors;
20
20
21
21
namespace {
22
22
23
+
24
+ constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10 ;
25
+ constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB;
26
+
23
27
class TDqChannelStorage : public IDqChannelStorage {
28
+ struct TWritingBlobInfo {
29
+ ui64 BlobSize_;
30
+ NThreading::TFuture<void > IsBlobWrittenFuture_;
31
+ };
24
32
public:
25
- TDqChannelStorage (TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem, bool isConcurrent )
33
+ TDqChannelStorage (TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
26
34
: ActorSystem_(actorSystem)
27
35
{
28
- if (isConcurrent) {
29
- SelfActor_ = CreateConcurrentDqChannelStorageActor (txId, channelId, std::move (wakeUp), actorSystem);
30
- } else {
31
- SelfActor_ = CreateDqChannelStorageActor (txId, channelId, std::move (wakeUp), actorSystem);
32
- }
33
- SelfActorId_ = TlsActivationContext->AsActorContext ().RegisterWithSameMailbox (SelfActor_->GetActor ());
34
- SelfId_ = TlsActivationContext->AsActorContext ().SelfID ;
36
+ ChannelStorageActor_ = CreateDqChannelStorageActor (txId, channelId, std::move (wakeUp), actorSystem);
37
+ ChannelStorageActorId_ = ActorSystem_->Register (ChannelStorageActor_->GetActor ());
35
38
}
36
39
37
40
~TDqChannelStorage () {
38
- if (ActorSystem_) {
39
- ActorSystem_->Send (
40
- new IEventHandle (
41
- SelfActorId_,
42
- SelfId_,
43
- new TEvents::TEvPoison,
44
- /* flags=*/ 0 ,
45
- /* cookie=*/ 0 ));
46
- } else {
47
- TlsActivationContext->AsActorContext ().Send (SelfActorId_, new TEvents::TEvPoison);
48
- }
41
+ ActorSystem_->Send (ChannelStorageActorId_, new TEvents::TEvPoison);
49
42
}
50
43
51
- bool IsEmpty () const override {
52
- return SelfActor_->IsEmpty ();
44
+ bool IsEmpty () override {
45
+ UpdateWriteStatus ();
46
+
47
+ return WritingBlobs_.empty () && StoredBlobsCount_ == 0 && LoadingBlobs_.empty ();
53
48
}
54
49
55
- bool IsFull () const override {
56
- return SelfActor_->IsFull ();
50
+ bool IsFull () override {
51
+ UpdateWriteStatus ();
52
+
53
+ return WritingBlobs_.size () > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsTotalSize_ > MAX_INFLIGHT_BLOBS_SIZE;
57
54
}
58
55
59
56
void Put (ui64 blobId, TRope&& blob, ui64 cookie = 0 ) override {
60
- SelfActor_->Put (blobId, std::move (blob), cookie);
57
+ UpdateWriteStatus ();
58
+
59
+ auto promise = NThreading::NewPromise<void >();
60
+ auto future = promise.GetFuture ();
61
+
62
+ ui64 blobSize = blob.size ();
63
+
64
+ ActorSystem_->Send (ChannelStorageActorId_, new TEvDqChannelSpilling::TEvPut (blobId, std::move (blob), std::move (promise)), /* flags*/ 0 , cookie);
65
+
66
+ WritingBlobs_.emplace (blobId, TWritingBlobInfo (blobSize, std::move (future)));
67
+ WritingBlobsTotalSize_ += blobSize;
61
68
}
62
69
63
70
bool Get (ui64 blobId, TBuffer& blob, ui64 cookie = 0 ) override {
64
- return SelfActor_->Get (blobId, blob, cookie);
71
+ UpdateWriteStatus ();
72
+
73
+ const auto it = LoadingBlobs_.find (blobId);
74
+ // If we didn't request loading blob from spilling -> request it
75
+ if (it == LoadingBlobs_.end ()) {
76
+ auto promise = NThreading::NewPromise<TBuffer>();
77
+ auto future = promise.GetFuture ();
78
+ ActorSystem_->Send (ChannelStorageActorId_, new TEvDqChannelSpilling::TEvGet (blobId, std::move (promise)), /* flags*/ 0 , cookie);
79
+
80
+ LoadingBlobs_.emplace (blobId, std::move (future));
81
+ return false ;
82
+ }
83
+ // If we requested loading blob, but it's not loaded -> wait
84
+ if (!it->second .HasValue ()) return false ;
85
+
86
+ blob = std::move (it->second .ExtractValue ());
87
+ LoadingBlobs_.erase (it);
88
+ --StoredBlobsCount_;
89
+
90
+ return true ;
91
+ }
92
+
93
+ private:
94
+ void UpdateWriteStatus () {
95
+ for (auto it = WritingBlobs_.begin (); it != WritingBlobs_.end ();) {
96
+ if (it->second .IsBlobWrittenFuture_ .HasValue ()) {
97
+ WritingBlobsTotalSize_ -= it->second .BlobSize_ ;
98
+ ++StoredBlobsCount_;
99
+ it = WritingBlobs_.erase (it);
100
+ } else {
101
+ ++it;
102
+ }
103
+ }
65
104
}
66
105
67
106
private:
68
- IDqChannelStorageActor* SelfActor_;
69
- TActorId SelfActorId_;
70
- TActorId SelfId_;
107
+ IDqChannelStorageActor* ChannelStorageActor_;
108
+ TActorId ChannelStorageActorId_;
71
109
TActorSystem *ActorSystem_;
110
+
111
+ // BlobId -> future with requested blob
112
+ std::unordered_map<ui64, NThreading::TFuture<TBuffer>> LoadingBlobs_;
113
+ // BlobId -> future with some additional info
114
+ std::unordered_map<ui64, TWritingBlobInfo> WritingBlobs_;
115
+ ui64 WritingBlobsTotalSize_ = 0 ;
116
+
117
+ ui64 StoredBlobsCount_ = 0 ;
72
118
};
73
119
74
120
} // anonymous namespace
75
121
76
- IDqChannelStorage::TPtr CreateDqChannelStorage (TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem, bool isConcurrent )
122
+ IDqChannelStorage::TPtr CreateDqChannelStorage (TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem)
77
123
{
78
- return new TDqChannelStorage (txId, channelId, std::move (wakeUp), actorSystem, isConcurrent );
124
+ return new TDqChannelStorage (txId, channelId, std::move (wakeUp), actorSystem);
79
125
}
80
126
81
127
} // namespace NYql::NDq
0 commit comments