Skip to content

Commit ad5da4d

Browse files
authored
YQL-17087: Add channel spilling to dq pipe communication (#612)
* Add channel spilling to dq pipe communication * Add EvPoison handler to DqChannelStorageActor
1 parent 653229b commit ad5da4d

File tree

9 files changed

+362
-96
lines changed

9 files changed

+362
-96
lines changed

ydb/core/tx/datashard/datashard_kqp.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1111,7 +1111,7 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext
11111111
return {};
11121112
}
11131113

1114-
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */) const override {
1114+
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override {
11151115
return {};
11161116
}
11171117
};

ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool wi
1414
}
1515

1616
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const {
17-
return CreateChannelStorage(channelId, nullptr);
17+
return CreateChannelStorage(channelId, nullptr, false);
1818
}
1919

20-
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const {
20+
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const {
2121
if (WithSpilling_) {
22-
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem);
22+
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem, isConcurrent);
2323
} else {
2424
return nullptr;
2525
}

ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
1212
TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp);
1313

1414
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
15-
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const override;
15+
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const override;
1616

1717
private:
1818
const TTxId TxId_;

ydb/library/yql/dq/actors/spilling/channel_storage.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@ namespace {
2222

2323
class TDqChannelStorage : public IDqChannelStorage {
2424
public:
25-
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) {
26-
SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
27-
TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor());
25+
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem, bool isConcurrent) {
26+
if (isConcurrent) {
27+
SelfActor_ = CreateConcurrentDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
28+
} else {
29+
SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
30+
}
31+
SelfActorId_ = TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor());
2832
}
2933

3034
~TDqChannelStorage() {
31-
SelfActor_->Terminate();
35+
TlsActivationContext->AsActorContext().Send(SelfActorId_, new TEvents::TEvPoison);
3236
}
3337

3438
bool IsEmpty() const override {
@@ -49,13 +53,14 @@ class TDqChannelStorage : public IDqChannelStorage {
4953

5054
private:
5155
IDqChannelStorageActor* SelfActor_;
56+
TActorId SelfActorId_;
5257
};
5358

5459
} // anonymous namespace
5560

56-
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem)
61+
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem, bool isConcurrent)
5762
{
58-
return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem);
63+
return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem, isConcurrent);
5964
}
6065

6166
} // namespace NYql::NDq

ydb/library/yql/dq/actors/spilling/channel_storage.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace NActors {
1010

1111
namespace NYql::NDq {
1212

13-
NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
14-
NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem);
13+
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
14+
IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem, bool isConcurrent);
1515

1616
} // namespace NYql::NDq

0 commit comments

Comments
 (0)