Skip to content

Commit 49e62b4

Browse files
Handle spilling errors correctly (#7435)
1 parent ee6b772 commit 49e62b4

25 files changed

+145
-83
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ using namespace NYql::NDq;
1414

1515
class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
1616
public:
17-
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
18-
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
17+
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
18+
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback))
1919
, WithSpilling_(withSpilling)
2020
{
2121
}

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@ void TKqpComputeActor::DoBootstrap() {
7272
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
7373
SetTaskRunner(taskRunner);
7474

75-
auto wakeup = [this]{ ContinueExecute(); };
75+
auto wakeupCallback = [this]{ ContinueExecute(); };
76+
auto errorCallback = [this](const TString& error){ SendError(error); };
7677
try {
77-
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
78+
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback)));
7879
} catch (const NMiniKQL::TKqpEnsureFail& e) {
7980
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
8081
return;

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ void TKqpScanComputeActor::DoBootstrap() {
220220
TBase::SetTaskRunner(taskRunner);
221221

222222
auto wakeup = [this] { ContinueExecute(); };
223-
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
223+
auto errorCallback = [this](const TString& error){ SendError(error); };
224+
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback)));
224225

225226
ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
226227
ScanData = &ComputeCtx.GetTableScan(0);

ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp

+43-21
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ NKikimrConfig::TAppConfig AppCfg() {
3232
return appCfg;
3333
}
3434

35-
NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold) {
35+
NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold, bool enableSpilling=true) {
3636
NKikimrConfig::TAppConfig appCfg;
3737

3838
auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
@@ -43,12 +43,32 @@ NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold) {
4343

4444
auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig();
4545

46-
spilling->SetEnable(true);
46+
spilling->SetEnable(enableSpilling);
4747
spilling->SetRoot("./spilling/");
4848

4949
return appCfg;
5050
}
5151

52+
void FillTableWithData(NQuery::TQueryClient& db, ui64 numRows=300) {
53+
for (ui32 i = 0; i < numRows; ++i) {
54+
auto result = db.ExecuteQuery(Sprintf(R"(
55+
--!syntax_v1
56+
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
57+
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
58+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
59+
}
60+
}
61+
62+
constexpr auto SimpleGraceJoinWithSpillingQuery = R"(
63+
--!syntax_v1
64+
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
65+
PRAGMA ydb.CostBasedOptimizationLevel='0';
66+
PRAGMA ydb.HashJoinMode='graceandself';
67+
select t1.Key, t1.Value, t2.Key, t2.Value
68+
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
69+
order by t1.Value
70+
)";
71+
5272

5373
} // anonymous namespace
5474

@@ -79,31 +99,15 @@ Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
7999

80100
auto db = kikimr.GetQueryClient();
81101

82-
for (ui32 i = 0; i < 300; ++i) {
83-
auto result = db.ExecuteQuery(Sprintf(R"(
84-
--!syntax_v1
85-
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
86-
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
87-
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
88-
}
89-
90-
auto query = R"(
91-
--!syntax_v1
92-
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
93-
PRAGMA ydb.CostBasedOptimizationLevel='0';
94-
PRAGMA ydb.HashJoinMode='graceandself';
95-
select t1.Key, t1.Value, t2.Key, t2.Value
96-
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
97-
order by t1.Value
98-
)";
102+
FillTableWithData(db);
99103

100104
auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
101-
auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
105+
auto planres = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
102106
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
103107

104108
Cerr << planres.GetStats()->GetAst() << Endl;
105109

106-
auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
110+
auto result = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
107111
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
108112

109113
TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
@@ -116,6 +120,24 @@ Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
116120
}
117121
}
118122

123+
Y_UNIT_TEST(HandleErrorsCorrectly) {
124+
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
125+
TKikimrRunner kikimr(AppCfgLowComputeLimits(0.01, false));
126+
127+
auto db = kikimr.GetQueryClient();
128+
129+
FillTableWithData(db);
130+
131+
auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
132+
auto planres = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
133+
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
134+
135+
Cerr << planres.GetStats()->GetAst() << Endl;
136+
137+
auto result = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
138+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::INTERNAL_ERROR, result.GetIssues().ToString());
139+
}
140+
119141
Y_UNIT_TEST(SelfJoinQueryService) {
120142
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
121143

ydb/core/tx/datashard/datashard_kqp.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,11 @@ class TKqpTaskRunnerExecutionContext: public NDq::IDqTaskRunnerExecutionContext
10121012
return {};
10131013
}
10141014

1015-
std::function<void()> GetWakeupCallback() const override {
1015+
NDq::TWakeUpCallback GetWakeupCallback() const override {
1016+
return {};
1017+
}
1018+
1019+
NDq::TErrorCallback GetErrorCallback() const override {
10161020
return {};
10171021
}
10181022

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
127127

128128
Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>);
129129

130-
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
131-
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeup));
130+
auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
131+
auto errorCallback = [this](const TString& error){ SendError(error); };
132+
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeupCallback), std::move(errorCallback));
132133

133134
Send(TaskRunnerActorId,
134135
new NTaskRunnerActor::TEvTaskRunnerCreate(

ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ class TDqComputeActor : public TDqSyncComputeActorBase<TDqComputeActor> {
5858

5959
auto taskRunner = TaskRunnerFactory(GetAllocatorPtr(), Task, RuntimeSettings.StatsMode, logger);
6060
SetTaskRunner(taskRunner);
61-
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
62-
TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeup));
61+
auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
62+
auto errorCallback = [this](const TString& error){ SendError(error); };
63+
TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeupCallback), std::move(errorCallback));
6364
PrepareTaskRunner(execCtx);
6465

6566
ContinueExecute(EResumeSource::CABootstrap);

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

+4
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
629629
}
630630
}
631631

632+
void SendError(const TString& error) {
633+
this->Send(this->SelfId(), TEvDq::TEvAbortExecution::InternalError(error));
634+
}
635+
632636
protected: //TDqComputeActorChannels::ICallbacks
633637
//i64 GetInputChannelFreeSpace(ui64 channelId) is pure and must be overridded in derived class
634638

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
213213
TaskRunner->Prepare(this->Task, limits, execCtx);
214214

215215
if (this->Task.GetEnableSpilling()) {
216-
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback()));
216+
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));
217217
}
218218

219219
for (auto& [channelId, channel] : this->InputChannelsMap) {

ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp

+10-5
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
namespace NYql {
77
namespace NDq {
88

9-
TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp)
9+
TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
1010
: TxId_(txId)
11-
, WakeUp_(std::move(wakeUp))
11+
, WakeUpCallback_(std::move(wakeUpCallback))
12+
, ErrorCallback_(std::move(errorCallback))
1213
{
1314
}
1415

@@ -18,14 +19,18 @@ IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64
1819

1920
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const {
2021
if (withSpilling) {
21-
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem);
22+
return CreateDqChannelStorage(TxId_, channelId, WakeUpCallback_, ErrorCallback_, actorSystem);
2223
} else {
2324
return nullptr;
2425
}
2526
}
2627

27-
std::function<void()> TDqTaskRunnerExecutionContext::GetWakeupCallback() const {
28-
return WakeUp_;
28+
TWakeUpCallback TDqTaskRunnerExecutionContext::GetWakeupCallback() const {
29+
return WakeUpCallback_;
30+
}
31+
32+
TErrorCallback TDqTaskRunnerExecutionContext::GetErrorCallback() const {
33+
return ErrorCallback_;
2934
}
3035

3136
TTxId TDqTaskRunnerExecutionContext::GetTxId() const {

ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h

+5-3
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,19 @@ namespace NDq {
99

1010
class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
1111
public:
12-
TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp);
12+
TDqTaskRunnerExecutionContext(TTxId txId, TWakeUpCallback&& WakeUpCallback_, TErrorCallback&& ErrorCallback_);
1313

1414
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override;
1515
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const override;
1616

17-
std::function<void()> GetWakeupCallback() const override;
17+
TWakeUpCallback GetWakeupCallback() const override;
18+
TErrorCallback GetErrorCallback() const override;
1819
TTxId GetTxId() const override;
1920

2021
private:
2122
const TTxId TxId_;
22-
const IDqChannelStorage::TWakeUpCallback WakeUp_;
23+
const TWakeUpCallback WakeUpCallback_;
24+
const TErrorCallback ErrorCallback_;
2325
};
2426

2527
} // namespace NDq

ydb/library/yql/dq/actors/spilling/channel_storage.cpp

+7-4
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ class TDqChannelStorage : public IDqChannelStorage {
3030
NThreading::TFuture<void> IsBlobWrittenFuture_;
3131
};
3232
public:
33-
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
33+
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem)
3434
: ActorSystem_(actorSystem)
3535
{
36-
ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
36+
ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
3737
ChannelStorageActorId_ = ActorSystem_->Register(ChannelStorageActor_->GetActor());
3838
}
3939

@@ -119,9 +119,12 @@ class TDqChannelStorage : public IDqChannelStorage {
119119

120120
} // anonymous namespace
121121

122-
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem)
122+
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
123+
TWakeUpCallback wakeUpCallback,
124+
TErrorCallback errorCallback,
125+
TActorSystem* actorSystem)
123126
{
124-
return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem);
127+
return new TDqChannelStorage(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
125128
}
126129

127130
} // namespace NYql::NDq

ydb/library/yql/dq/actors/spilling/channel_storage.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ namespace NActors {
1111
namespace NYql::NDq {
1212

1313
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
14-
IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem);
14+
TWakeUpCallback wakeUpCallback,
15+
TErrorCallback errorCallback,
16+
NActors::TActorSystem* actorSystem);
1517

1618
} // namespace NYql::NDq

ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp

+16-11
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
4343
using TBase = TActorBootstrapped<TDqChannelStorageActor>;
4444
public:
4545

46-
TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
46+
TDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem)
4747
: TxId_(txId)
4848
, ChannelId_(channelId)
49-
, WakeUp_(std::move(wakeUp))
49+
, WakeUpCallback_(std::move(wakeUpCallback))
50+
, ErrorCallback_(std::move(errorCallback))
5051
, ActorSystem_(actorSystem)
5152
{}
5253

@@ -65,13 +66,12 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
6566

6667
protected:
6768
void FailWithError(const TString& error) {
69+
if (!ErrorCallback_) Y_ABORT("Error: %s", error.c_str());
70+
6871
LOG_E("Error: " << error);
72+
ErrorCallback_(error);
6973
SendInternal(SpillingActorId_, new TEvents::TEvPoison);
7074
PassAway();
71-
72-
// Currently there is no better way to handle the error.
73-
// Since the message was not sent from the actor system, there is no one to send the error message to.
74-
Y_ABORT("Error: %s", error.c_str());
7575
}
7676

7777
void SendInternal(const TActorId& recipient, IEventBase* ev, TEventFlags flags = IEventHandle::FlagTrackDelivery) {
@@ -130,7 +130,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
130130
it->second.SetValue();
131131
WritingBlobs_.erase(it);
132132

133-
WakeUp_();
133+
WakeUpCallback_();
134134
}
135135

136136
void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) {
@@ -146,7 +146,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
146146
it->second.SetValue(std::move(msg.Blob));
147147
LoadingBlobs_.erase(it);
148148

149-
WakeUp_();
149+
WakeUpCallback_();
150150
}
151151

152152
void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) {
@@ -163,7 +163,8 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
163163
private:
164164
const TTxId TxId_;
165165
const ui64 ChannelId_;
166-
IDqChannelStorage::TWakeUpCallback WakeUp_;
166+
TWakeUpCallback WakeUpCallback_;
167+
TErrorCallback ErrorCallback_;
167168
TActorId SpillingActorId_;
168169

169170
// BlobId -> promise that blob is saved
@@ -177,8 +178,12 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
177178

178179
} // anonymous namespace
179180

180-
IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) {
181-
return new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
181+
IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId,
182+
TWakeUpCallback&& wakeUpCallback,
183+
TErrorCallback&& errorCallback,
184+
NActors::TActorSystem* actorSystem)
185+
{
186+
return new TDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
182187
}
183188

184189
} // namespace NYql::NDq

ydb/library/yql/dq/actors/spilling/channel_storage_actor.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,6 @@ class IDqChannelStorageActor
4949
virtual NActors::IActor* GetActor() = 0;
5050
};
5151

52-
IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem);
52+
IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, NActors::TActorSystem* actorSystem);
5353

5454
} // namespace NYql::NDq

ydb/library/yql/dq/actors/spilling/compute_storage.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ namespace NYql::NDq {
66

77
using namespace NActors;
88

9-
TDqComputeStorage::TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) {
9+
TDqComputeStorage::TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) {
1010
TStringStream spillerName;
1111
spillerName << "Spiller" << "_" << CreateGuidAsString();
12-
ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback);
12+
ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback, errorCallback);
1313
ComputeStorageActorId_ = ActorSystem_->Register(ComputeStorageActor_->GetActor());
1414
}
1515

ydb/library/yql/dq/actors/spilling/compute_storage.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class TDqComputeStorage : public NKikimr::NMiniKQL::ISpiller
1717
{
1818
public:
1919

20-
TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, NActors::TActorSystem* actorSystem);
20+
TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, NActors::TActorSystem* actorSystem);
2121

2222
~TDqComputeStorage();
2323

0 commit comments

Comments
 (0)