Skip to content

Commit 2280352

Browse files
authored
Refactor DQ Channel Storage to accept external actor system (#547)
1 parent 66b69b6 commit 2280352

File tree

8 files changed

+64
-21
lines changed

8 files changed

+64
-21
lines changed

ydb/core/tx/datashard/datashard_kqp.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,10 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext
11101110
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */) const override {
11111111
return {};
11121112
}
1113+
1114+
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */) const override {
1115+
return {};
1116+
}
11131117
};
11141118

11151119
} // anonymous namespace

ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@ TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool wi
1414
}
1515

1616
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const {
17+
return CreateChannelStorage(channelId, nullptr);
18+
}
19+
20+
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const {
1721
if (WithSpilling_) {
18-
return CreateDqChannelStorage(TxId_, channelId, WakeUp_);
22+
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem);
1923
} else {
2024
return nullptr;
2125
}

ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
#include <ydb/library/yql/dq/common/dq_common.h>
55
#include <ydb/library/actors/core/actor.h>
66

7-
87
namespace NYql {
98
namespace NDq {
109

@@ -13,6 +12,7 @@ class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
1312
TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp);
1413

1514
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
15+
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const override;
1616

1717
private:
1818
const TTxId TxId_;

ydb/library/yql/dq/actors/spilling/channel_storage.cpp

+35-14
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,12 @@ class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor>
4141
using TBase = TActorBootstrapped<TDqChannelStorageActor>;
4242

4343
public:
44-
TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp)
44+
TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
4545
: TxId_(txId)
4646
, ChannelId_(channelId)
47-
, WakeUp_(std::move(wakeUp)) {}
47+
, WakeUp_(std::move(wakeUp))
48+
, ActorSystem_(actorSystem)
49+
{}
4850

4951
void Bootstrap() {
5052
auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_,
@@ -135,21 +137,21 @@ class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor>
135137
return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE;
136138
}
137139

138-
void Put(ui64 blobId, TRope&& blob) {
140+
void Put(ui64 blobId, TRope&& blob, ui64 cookie) {
139141
FailOnError();
140142

141143
// TODO: timeout
142144
// TODO: limit inflight events
143145

144146
ui64 size = blob.size();
145147

146-
Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(blobId, std::move(blob)));
148+
SendEvent(new TEvDqSpilling::TEvWrite(blobId, std::move(blob)), cookie);
147149

148150
WritingBlobs_.emplace(blobId, size);
149151
WritingBlobsSize_ += size;
150152
}
151153

152-
bool Get(ui64 blobId, TBuffer& blob) {
154+
bool Get(ui64 blobId, TBuffer& blob, ui64 cookie) {
153155
FailOnError();
154156

155157
auto loadedIt = LoadedBlobs_.find(blobId);
@@ -162,7 +164,7 @@ class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor>
162164

163165
auto result = LoadingBlobs_.emplace(blobId);
164166
if (result.second) {
165-
Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, true));
167+
SendEvent(new TEvDqSpilling::TEvRead(blobId, true), cookie);
166168
}
167169

168170
return false;
@@ -181,6 +183,23 @@ class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor>
181183
}
182184
}
183185

186+
template<typename T>
187+
void SendEvent(T* event, ui64 cookie) {
188+
if (ActorSystem_) {
189+
ActorSystem_->Send(
190+
new IEventHandle(
191+
SpillingActorId_,
192+
SelfId(),
193+
event,
194+
/*flags=*/0,
195+
cookie
196+
)
197+
);
198+
} else {
199+
Send(SpillingActorId_, event);
200+
}
201+
}
202+
184203
private:
185204
const TTxId TxId_;
186205
const ui64 ChannelId_;
@@ -197,13 +216,15 @@ class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor>
197216
TMap<ui64, TBuffer> LoadedBlobs_;
198217

199218
TMaybe<TString> Error_;
219+
220+
TActorSystem* ActorSystem_;
200221
};
201222

202223

203224
class TDqChannelStorage : public IDqChannelStorage {
204225
public:
205-
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp) {
206-
SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp));
226+
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) {
227+
SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
207228
TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_);
208229
}
209230

@@ -219,12 +240,12 @@ class TDqChannelStorage : public IDqChannelStorage {
219240
return SelfActor_->IsFull();
220241
}
221242

222-
void Put(ui64 blobId, TRope&& blob) override {
223-
SelfActor_->Put(blobId, std::move(blob));
243+
void Put(ui64 blobId, TRope&& blob, ui64 cookie = 0) override {
244+
SelfActor_->Put(blobId, std::move(blob), cookie);
224245
}
225246

226-
bool Get(ui64 blobId, TBuffer& blob) override {
227-
return SelfActor_->Get(blobId, blob);
247+
bool Get(ui64 blobId, TBuffer& blob, ui64 cookie = 0) override {
248+
return SelfActor_->Get(blobId, blob, cookie);
228249
}
229250

230251
private:
@@ -233,9 +254,9 @@ class TDqChannelStorage : public IDqChannelStorage {
233254

234255
} // anonymous namespace
235256

236-
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp)
257+
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem)
237258
{
238-
return new TDqChannelStorage(txId, channelId, std::move(wakeUp));
259+
return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem);
239260
}
240261

241262
} // namespace NYql::NDq

ydb/library/yql/dq/actors/spilling/channel_storage.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,13 @@
44
#include <ydb/library/yql/dq/runtime/dq_channel_storage.h>
55
#include <ydb/library/actors/core/actor.h>
66

7+
namespace NActors {
8+
class TActorSystem;
9+
};
10+
711
namespace NYql::NDq {
812

913
NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
10-
NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb);
14+
NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem);
1115

1216
} // namespace NYql::NDq

ydb/library/yql/dq/runtime/dq_channel_storage.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ class IDqChannelStorage : public TSimpleRefCount<IDqChannelStorage> {
2626
// methods Put/Get can throw `TDqChannelStorageException`
2727

2828
// Data should be owned by `blob` argument since the Put() call is actually asynchronous
29-
virtual void Put(ui64 blobId, TRope&& blob) = 0;
29+
virtual void Put(ui64 blobId, TRope&& blob, ui64 cookie = 0) = 0;
3030

3131
// TODO: there is no way for client to delete blob.
3232
// It is better to replace Get() with Pull() which will delete blob after read
3333
// (current clients read each blob exactly once)
3434
// Get() will return false if data is not ready yet. Client should repeat Get() in this case
35-
virtual bool Get(ui64 blobId, TBuffer& data) = 0;
35+
virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0;
3636
};
3737

3838
} // namespace NYql::NDq

ydb/library/yql/dq/runtime/dq_tasks_runner.h

+10
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
#include <util/generic/size_literals.h>
2323
#include <util/system/types.h>
2424

25+
namespace NActors {
26+
class TActorSystem;
27+
};
28+
2529
namespace NYql::NDq {
2630

2731
enum class ERunStatus : ui32 {
@@ -133,6 +137,7 @@ class IDqTaskRunnerExecutionContext {
133137
TVector<IDqOutput::TPtr>&& outputs) const = 0;
134138

135139
virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const = 0;
140+
virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const = 0;
136141
};
137142

138143
class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext {
@@ -149,6 +154,11 @@ class TDqTaskRunnerExecutionContextDefault : public TDqTaskRunnerExecutionContex
149154
IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/) const override {
150155
return {};
151156
};
157+
158+
IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/, NActors::TActorSystem* /*actorSystem*/) const override {
159+
return {};
160+
};
161+
152162
};
153163

154164
struct TDqTaskRunnerSettings {

ydb/library/yql/dq/runtime/ut/ut_helper.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class TMockChannelStorage : public IDqChannelStorage {
2020
return Capacity <= UsedSpace;
2121
}
2222

23-
void Put(ui64 blobId, TRope&& blob) override {
23+
void Put(ui64 blobId, TRope&& blob, ui64 /* cookie = 0 */) override {
2424
if (UsedSpace + blob.size() > Capacity) {
2525
ythrow yexception() << "Space limit exceeded";
2626
}
@@ -30,7 +30,7 @@ class TMockChannelStorage : public IDqChannelStorage {
3030
UsedSpace += result.first->second.size();
3131
}
3232

33-
bool Get(ui64 blobId, TBuffer& data) override {
33+
bool Get(ui64 blobId, TBuffer& data, ui64 /* cookie = 0 */) override {
3434
if (!Blobs.contains(blobId)) {
3535
ythrow yexception() << "Not found";
3636
}

0 commit comments

Comments
 (0)