Skip to content

Commit 009d3c9

Browse files
scan error processing on restore data (#7813)
1 parent 38a1881 commit 009d3c9

File tree

4 files changed

+27
-7
lines changed

4 files changed

+27
-7
lines changed

ydb/core/tx/columnshard/data_reader/actor.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanInitActor::TPtr& ev) {
3535
}
3636

3737
void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanError::TPtr& ev) {
38-
AFL_VERIFY(false)("error", NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
38+
SwitchStage(EStage::WaitData, EStage::Finished);
39+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "problem_on_restore_data")(
40+
"reason", NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
41+
RestoreTask->OnError(NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
3942
}
4043

4144
void TActor::Bootstrap(const TActorContext& /*ctx*/) {

ydb/core/tx/columnshard/data_reader/actor.h

+5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class IRestoreTask {
1212
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
1313
virtual TConclusionStatus DoOnDataChunk(const std::shared_ptr<arrow::Table>& data) = 0;
1414
virtual TConclusionStatus DoOnFinished() = 0;
15+
virtual void DoOnError(const TString& errorMessage) = 0;
1516
virtual std::unique_ptr<TEvColumnShard::TEvInternalScan> DoBuildRequestInitiator() const = 0;
1617

1718
public:
@@ -24,6 +25,10 @@ class IRestoreTask {
2425
return DoOnFinished();
2526
}
2627

28+
void OnError(const TString& errorMessage) {
29+
DoOnError(errorMessage);
30+
}
31+
2732
std::unique_ptr<TEvColumnShard::TEvInternalScan> BuildRequestInitiator() const {
2833
return DoBuildRequestInitiator();
2934
}

ydb/core/tx/columnshard/operations/batch_builder/restore.cpp

+15-6
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,19 @@ std::unique_ptr<NKikimr::TEvColumnShard::TEvInternalScan> TModificationRestoreTa
2525
NKikimr::TConclusionStatus TModificationRestoreTask::DoOnDataChunk(const std::shared_ptr<arrow::Table>& data) {
2626
auto result = Merger->AddExistsDataOrdered(data);
2727
if (result.IsFail()) {
28-
auto writeDataPtr = std::make_shared<NEvWrite::TWriteData>(std::move(WriteData));
29-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "restore_data_problems")
28+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "merge_data_problems")
3029
("write_id", WriteData.GetWriteMeta().GetWriteId())("tablet_id", TabletId)("message", result.GetErrorMessage());
31-
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(*writeDataPtr) });
32-
auto evResult = NColumnShard::TEvPrivate::TEvWriteBlobsResult::Error(NKikimrProto::EReplyStatus::CORRUPTED,
33-
std::move(buffer), result.GetErrorMessage());
34-
TActorContext::AsActorContext().Send(ParentActorId, evResult.release());
30+
SendErrorMessage(result.GetErrorMessage());
3531
}
3632
return result;
3733
}
3834

35+
void TModificationRestoreTask::DoOnError(const TString& errorMessage) {
36+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "restore_data_problems")("write_id", WriteData.GetWriteMeta().GetWriteId())(
37+
"tablet_id", TabletId)("message", errorMessage);
38+
SendErrorMessage(errorMessage);
39+
}
40+
3941
NKikimr::TConclusionStatus TModificationRestoreTask::DoOnFinished() {
4042
{
4143
auto result = Merger->Finish();
@@ -65,4 +67,11 @@ TModificationRestoreTask::TModificationRestoreTask(const ui64 tabletId, const NA
6567

6668
}
6769

70+
void TModificationRestoreTask::SendErrorMessage(const TString& errorMessage) {
71+
auto writeDataPtr = std::make_shared<NEvWrite::TWriteData>(std::move(WriteData));
72+
TWritingBuffer buffer(writeDataPtr->GetBlobsAction(), { std::make_shared<TWriteAggregation>(*writeDataPtr) });
73+
auto evResult = NColumnShard::TEvPrivate::TEvWriteBlobsResult::Error(NKikimrProto::EReplyStatus::CORRUPTED, std::move(buffer), errorMessage);
74+
TActorContext::AsActorContext().Send(ParentActorId, evResult.release());
75+
}
76+
6877
}

ydb/core/tx/columnshard/operations/batch_builder/restore.h

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ class TModificationRestoreTask: public NDataReader::IRestoreTask {
2222

2323
virtual TConclusionStatus DoOnDataChunk(const std::shared_ptr<arrow::Table>& data) override;
2424
virtual TConclusionStatus DoOnFinished() override;
25+
virtual void DoOnError(const TString& errorMessage) override;
26+
void SendErrorMessage(const TString& errorMessage);
27+
2528
public:
2629
TModificationRestoreTask(const ui64 tabletId, const NActors::TActorId parentActorId,
2730
const NActors::TActorId bufferActorId, NEvWrite::TWriteData&& writeData, const std::shared_ptr<IMerger>& merger,

0 commit comments

Comments
 (0)