Skip to content

Commit da925c3

Browse files
better error handling in spilling actors (#6454)
1 parent d8c88f9 commit da925c3

File tree

2 files changed

+44
-43
lines changed

2 files changed

+44
-43
lines changed

ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp

+24-17
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,22 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
6363
return this;
6464
}
6565

66+
protected:
67+
void FailWithError(const TString& error) {
68+
LOG_E("Error: " << error);
69+
SendInternal(SpillingActorId_, new TEvents::TEvPoison);
70+
PassAway();
71+
72+
// Currently there is no better way to handle the error.
73+
// Since the message was not sent from the actor system, there is no one to send the error message to.
74+
Y_ABORT("Error: %s", error.c_str());
75+
}
76+
77+
void SendInternal(const TActorId& recipient, IEventBase* ev, TEventFlags flags = IEventHandle::FlagTrackDelivery) {
78+
bool isSent = Send(recipient, ev, flags);
79+
Y_ABORT_UNLESS(isSent, "Event was not sent");
80+
}
81+
6682
private:
6783

6884
STATEFN(WorkState) {
@@ -80,13 +96,15 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
8096
}
8197
}
8298

99+
100+
83101
void HandleWork(TEvDqChannelSpilling::TEvGet::TPtr& ev) {
84102
auto& msg = *ev->Get();
85103
LOG_T("[TEvGet] blobId: " << msg.BlobId_);
86104

87105
LoadingBlobs_.emplace(msg.BlobId_, std::move(msg.Promise_));
88106

89-
Send(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.BlobId_));
107+
SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.BlobId_));
90108
}
91109

92110
void HandleWork(TEvDqChannelSpilling::TEvPut::TPtr& ev) {
@@ -95,7 +113,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
95113

96114
WritingBlobs_.emplace(msg.BlobId_, std::move(msg.Promise_));
97115

98-
Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(msg.BlobId_, std::move(msg.Blob_)));
116+
SendInternal(SpillingActorId_, new TEvDqSpilling::TEvWrite(msg.BlobId_, std::move(msg.Blob_)));
99117
}
100118

101119
void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
@@ -104,11 +122,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
104122

105123
const auto it = WritingBlobs_.find(msg.BlobId);
106124
if (it == WritingBlobs_.end()) {
107-
LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId);
108-
109-
Error_ = "Internal error";
110-
111-
Send(SpillingActorId_, new TEvents::TEvPoison);
125+
FailWithError(TStringBuilder() << "[TEvWriteResult] Got unexpected TEvWriteResult, blobId: " << msg.BlobId);
112126
return;
113127
}
114128

@@ -125,11 +139,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
125139

126140
const auto it = LoadingBlobs_.find(msg.BlobId);
127141
if (it == LoadingBlobs_.end()) {
128-
LOG_E("Got unexpected TEvReadResult, blobId: " << msg.BlobId);
129-
130-
Error_ = "Internal error";
131-
132-
Send(SpillingActorId_, new TEvents::TEvPoison);
142+
FailWithError(TStringBuilder() << "[TEvReadResult] Got unexpected TEvReadResult, blobId: " << msg.BlobId);
133143
return;
134144
}
135145

@@ -141,13 +151,11 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
141151

142152
void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) {
143153
auto& msg = *ev->Get();
144-
LOG_D("[TEvError] " << msg.Message);
145-
146-
Error_.ConstructInPlace(msg.Message);
154+
FailWithError(TStringBuilder() << "[TEvError] " << msg.Message);
147155
}
148156

149157
void PassAway() override {
150-
Send(SpillingActorId_, new TEvents::TEvPoison);
158+
SendInternal(SpillingActorId_, new TEvents::TEvPoison);
151159
TBase::PassAway();
152160
}
153161

@@ -163,7 +171,6 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
163171

164172
// BlobId -> promise with requested blob
165173
std::unordered_map<ui64, NThreading::TPromise<TBuffer>> LoadingBlobs_;
166-
TMaybe<TString> Error_;
167174

168175
TActorSystem* ActorSystem_;
169176
};

ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp

+20-26
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,21 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
6060
return this;
6161
}
6262

63-
6463
protected:
6564

66-
void FailOnError() {
67-
if (Error_) {
68-
LOG_E("Error: " << *Error_);
69-
Send(SpillingActorId_, new TEvents::TEvPoison);
70-
}
65+
void FailWithError(const TString& error) {
66+
LOG_E("Error: " << error);
67+
SendInternal(SpillingActorId_, new TEvents::TEvPoison);
68+
PassAway();
69+
70+
// Currently there is no better way to handle the error.
71+
// Since the message was not sent from the actor system, there is no one to send the error message to.
72+
Y_ABORT("Error: %s", error.c_str());
73+
}
74+
75+
void SendInternal(const TActorId& recipient, IEventBase* ev, TEventFlags flags = IEventHandle::FlagTrackDelivery) {
76+
bool isSent = Send(recipient, ev, flags);
77+
Y_ABORT_UNLESS(isSent, "Event was not sent");
7178
}
7279

7380
private:
@@ -88,15 +95,15 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
8895
}
8996

9097
void HandleWork(TEvents::TEvPoison::TPtr&) {
91-
Send(SpillingActorId_, new TEvents::TEvPoison);
98+
SendInternal(SpillingActorId_, new TEvents::TEvPoison);
9299
PassAway();
93100
}
94101

95102
void HandleWork(TEvPut::TPtr& ev) {
96103
auto& msg = *ev->Get();
97104
ui64 size = msg.Blob_.size();
98105

99-
Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(NextBlobId, std::move(msg.Blob_)));
106+
SendInternal(SpillingActorId_, new TEvDqSpilling::TEvWrite(NextBlobId, std::move(msg.Blob_)));
100107

101108
WritingBlobs_.emplace(NextBlobId, std::make_pair(size, std::move(msg.Promise_)));
102109
WritingBlobsSize_ += size;
@@ -117,7 +124,7 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
117124
TLoadingBlobInfo loadingBlobInfo = std::make_pair(removeBlobAfterRead, std::move(msg.Promise_));
118125
LoadingBlobs_.emplace(msg.Key_, std::move(loadingBlobInfo));
119126

120-
Send(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_, removeBlobAfterRead));
127+
SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_, removeBlobAfterRead));
121128
}
122129

123130
void HandleWork(TEvDelete::TPtr& ev) {
@@ -130,7 +137,7 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
130137

131138
DeletingBlobs_.emplace(msg.Key_, std::move(msg.Promise_));
132139

133-
Send(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_, true));
140+
SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_, true));
134141
}
135142

136143
void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
@@ -140,11 +147,7 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
140147

141148
auto it = WritingBlobs_.find(msg.BlobId);
142149
if (it == WritingBlobs_.end()) {
143-
LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId);
144-
145-
Error_ = "Internal error";
146-
147-
Send(SpillingActorId_, new TEvents::TEvPoison);
150+
FailWithError(TStringBuilder() << "[TEvWriteResult] Got unexpected TEvWriteResult, blobId: " << msg.BlobId);
148151
return;
149152
}
150153

@@ -177,11 +180,7 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
177180

178181
auto it = LoadingBlobs_.find(msg.BlobId);
179182
if (it == LoadingBlobs_.end()) {
180-
LOG_E("Got unexpected TEvReadResult, blobId: " << msg.BlobId);
181-
182-
Error_ = "Internal error";
183-
184-
Send(SpillingActorId_, new TEvents::TEvPoison);
183+
FailWithError(TStringBuilder() << "[TEvReadResult] Got unexpected TEvReadResult, blobId: " << msg.BlobId);
185184
return;
186185
}
187186

@@ -202,9 +201,7 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
202201

203202
void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) {
204203
auto& msg = *ev->Get();
205-
LOG_D("[TEvError] " << msg.Message);
206-
207-
Error_.ConstructInPlace(msg.Message);
204+
FailWithError(TStringBuilder() << "[TEvError] " << msg.Message);
208205
}
209206

210207
bool HandleDelete(TKey blobId, ui64 size) {
@@ -241,8 +238,6 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
241238

242239
TMap<TKey, TDeletingBlobInfo> DeletingBlobs_;
243240

244-
TMaybe<TString> Error_;
245-
246241
TKey NextBlobId = 0;
247242

248243
TString SpillerName_;
@@ -252,7 +247,6 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
252247
std::function<void()> WakeupCallback_;
253248

254249
TSet<TKey> StoredBlobs_;
255-
256250
};
257251

258252
} // anonymous namespace

0 commit comments

Comments
 (0)