Skip to content

Commit 3966efa

Browse files
conclusions for unready storages and scanner methods (#3282)
1 parent eae5f51 commit 3966efa

File tree

17 files changed

+110
-131
lines changed

17 files changed

+110
-131
lines changed

ydb/core/tx/columnshard/counters/scan.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@ class TScanCounters: public TCommonCountersOwner {
4545
Success /* "Success" */ = 0,
4646
ConveyorInternalError /* "ConveyorInternalError" */,
4747
ExternalAbort /* "ExternalAbort" */,
48-
IteratorInternalError /* "IteratorInternalError" */,
48+
IteratorInternalErrorScan /* "IteratorInternalErrorScan" */,
49+
IteratorInternalErrorResult /* "IteratorInternalErrorResult" */,
4950
Deadline /* "Deadline" */,
5051
UndeliveredEvent /* "UndeliveredEvent" */,
52+
CannotAddInFlight /* "CannotAddInFlight" */,
5153

5254
COUNT
5355
};

ydb/core/tx/columnshard/engines/reader/abstract/abstract.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ class TScanIteratorBase {
1919
return {};
2020
}
2121
virtual bool Finished() const = 0;
22-
virtual std::optional<TPartialReadResult> GetBatch() = 0;
22+
virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() = 0;
2323
virtual void PrepareResults() {
2424

2525
}
26-
virtual bool ReadNextInterval() { return false; }
26+
virtual TConclusion<bool> ReadNextInterval() { return false; }
2727
virtual TString DebugString(const bool verbose = false) const {
2828
Y_UNUSED(verbose);
2929
return "NO_DATA";

ydb/core/tx/columnshard/engines/reader/abstract/read_context.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class IDataReader {
108108
virtual void DoAbort() = 0;
109109
virtual bool DoIsFinished() const = 0;
110110
virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0;
111-
virtual bool DoReadNextInterval() = 0;
111+
virtual TConclusion<bool> DoReadNextInterval() = 0;
112112
public:
113113
IDataReader(const std::shared_ptr<TReadContext>& context);
114114
virtual ~IDataReader() = default;
@@ -156,7 +156,7 @@ class IDataReader {
156156
sb << DoDebugString(verbose);
157157
return sb;
158158
}
159-
bool ReadNextInterval() {
159+
[[nodiscard]] TConclusion<bool> ReadNextInterval() {
160160
return DoReadNextInterval();
161161
}
162162
};

ydb/core/tx/columnshard/engines/reader/actor/actor.cpp

+44-52
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ void TColumnShardScan::PassAway() {
4545

4646
TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId, const std::shared_ptr<IStoragesManager>& storagesManager,
4747
const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
48-
ui64 tabletId, TDuration timeout, std::vector<TReadMetadataBase::TConstPtr>&& readMetadataList,
48+
ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
4949
NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool)
5050
: StoragesManager(storagesManager)
5151
, ColumnShardActorId(columnShardActorId)
@@ -57,15 +57,14 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
5757
, RequestCookie(requestCookie)
5858
, DataFormat(dataFormat)
5959
, TabletId(tabletId)
60-
, ReadMetadataRanges(std::move(readMetadataList))
61-
, ReadMetadataIndex(0)
60+
, ReadMetadataRange(readMetadataRange)
6261
, Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT))
6362
, ScanCountersPool(scanCountersPool)
6463
, Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/))
6564
, ComputeShardingPolicy(computeShardingPolicy)
6665
{
67-
AFL_VERIFY(ReadMetadataRanges.size() == 1);
68-
KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema();
66+
AFL_VERIFY(ReadMetadataRange);
67+
KeyYqlSchema = ReadMetadataRange->GetKeyYqlSchema();
6968
}
7069

7170
void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
@@ -81,8 +80,8 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
8180
ReadCoordinatorActorId = ctx.Register(new NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));
8281

8382
std::shared_ptr<TReadContext> context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool,
84-
ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
85-
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
83+
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
84+
ScanIterator = ReadMetadataRange->StartScan(context);
8685

8786
// propagate self actor id // TODO: FlagSubscribeOnSession ?
8887
Send(ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen, TabletId), IEventHandle::FlagTrackDelivery);
@@ -92,12 +91,6 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
9291
ContinueProcessing();
9392
}
9493

95-
bool TColumnShardScan::ReadNextBlob() {
96-
while (ScanIterator->ReadNextInterval()) {
97-
}
98-
return true;
99-
}
100-
10194
void TColumnShardScan::HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
10295
--InFlightReads;
10396
auto g = Stats->MakeGuard("task_result");
@@ -198,21 +191,24 @@ bool TColumnShardScan::ProduceResults() noexcept {
198191
return false;
199192
}
200193

201-
auto resultOpt = ScanIterator->GetBatch();
202-
if (!resultOpt) {
203-
ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
194+
auto resultConclusion = ScanIterator->GetBatch();
195+
if (resultConclusion.IsFail()) {
196+
ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", resultConclusion.GetErrorMessage());
197+
SendAbortExecution(resultConclusion.GetErrorMessage());
198+
199+
ScanIterator.reset();
200+
Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalErrorResult);
204201
return false;
205202
}
206-
auto& result = *resultOpt;
207-
if (!result.ErrorString.empty()) {
208-
ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", result.ErrorString);
209-
SendAbortExecution(TString(result.ErrorString.data(), result.ErrorString.size()));
210203

211-
ScanIterator.reset();
212-
Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalError);
204+
std::optional<TPartialReadResult> resultOpt = resultConclusion.DetachResult();
205+
if (!resultOpt) {
206+
ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
213207
return false;
214208
}
215209

210+
auto& result = *resultOpt;
211+
216212
if (!result.GetRecordsCount()) {
217213
ACFL_DEBUG("stage", "got empty batch")("iterator", ScanIterator->DebugString());
218214
return true;
@@ -263,14 +259,27 @@ void TColumnShardScan::ContinueProcessing() {
263259
while (ScanIterator && ProduceResults()) {
264260
}
265261

266-
// Switch to the next range if the current one is finished
267-
if (ScanIterator && ScanIterator->Finished() && ChunksLimiter.HasMore()) {
268-
NextReadMetadata();
269-
}
270-
271262
if (ScanIterator) {
272-
// Make read-ahead requests for the subsequent blobs
273-
ReadNextBlob();
263+
// Switch to the next range if the current one is finished
264+
if (ScanIterator->Finished() && ChunksLimiter.HasMore()) {
265+
auto g = Stats->MakeGuard("Finish");
266+
MakeResult();
267+
SendResult(false, true);
268+
ScanIterator.reset();
269+
Finish(NColumnShard::TScanCounters::EStatusFinish::Success);
270+
} else {
271+
while (true) {
272+
TConclusion<bool> hasMoreData = ScanIterator->ReadNextInterval();
273+
if (hasMoreData.IsFail()) {
274+
ACFL_ERROR("event", "ContinueProcessing")("error", hasMoreData.GetErrorMessage());
275+
ScanIterator.reset();
276+
SendScanError("iterator_error:" + hasMoreData.GetErrorMessage());
277+
return Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalErrorScan);
278+
} else if (!*hasMoreData) {
279+
break;
280+
}
281+
}
282+
}
274283
}
275284
AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)("gen", ScanGen)("tablet", TabletId)
276285
("debug", ScanIterator->DebugString());
@@ -286,21 +295,6 @@ void TColumnShardScan::MakeResult(size_t reserveRows /*= 0*/) {
286295
}
287296
}
288297

289-
void TColumnShardScan::NextReadMetadata() {
290-
auto g = Stats->MakeGuard("NextReadMetadata");
291-
if (++ReadMetadataIndex == ReadMetadataRanges.size()) {
292-
// Send empty batch with "finished" flag
293-
MakeResult();
294-
SendResult(false, true);
295-
ScanIterator.reset();
296-
return Finish(NColumnShard::TScanCounters::EStatusFinish::Success);
297-
}
298-
299-
auto context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(),
300-
ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
301-
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
302-
}
303-
304298
void TColumnShardScan::AddRow(const TConstArrayRef<TCell>& row) {
305299
Result->Rows.emplace_back(TOwnedCellVec::Make(row));
306300
++Rows;
@@ -379,11 +373,10 @@ bool TColumnShardScan::SendResult(bool pageFault, bool lastBatch) {
379373
return true;
380374
}
381375

382-
void TColumnShardScan::SendScanError(TString reason /*= {}*/) {
376+
void TColumnShardScan::SendScanError(const TString& reason) {
377+
AFL_VERIFY(reason);
383378
TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId;
384-
if (!reason.empty()) {
385-
msg += ", reason: " + reason;
386-
}
379+
msg += ", reason: " + reason;
387380

388381
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, TabletId);
389382
ev->Record.SetStatus(Ydb::StatusIds::GENERIC_ERROR);
@@ -393,12 +386,11 @@ void TColumnShardScan::SendScanError(TString reason /*= {}*/) {
393386
Send(ScanComputeActorId, ev.Release());
394387
}
395388

396-
void TColumnShardScan::SendAbortExecution(TString reason /*= {}*/) {
389+
void TColumnShardScan::SendAbortExecution(const TString& reason) {
390+
AFL_VERIFY(reason);
397391
auto status = NYql::NDqProto::StatusIds::PRECONDITION_FAILED;
398392
TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId;
399-
if (!reason.empty()) {
400-
msg += ", reason: " + reason;
401-
}
393+
msg += ", reason: " + reason;
402394

403395
Send(ScanComputeActorId, new NKqp::TEvKqp::TEvAbortExecution(status, msg));
404396
}

ydb/core/tx/columnshard/engines/reader/actor/actor.h

+4-9
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
3333
TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId,
3434
const std::shared_ptr<IStoragesManager>& storagesManager, const TComputeShardingPolicy& computeShardingPolicy,
3535
ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
36-
ui64 tabletId, TDuration timeout, std::vector<TReadMetadataBase::TConstPtr>&& readMetadataList,
36+
ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
3737
NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool);
3838

3939
void Bootstrap(const TActorContext& ctx);
@@ -55,8 +55,6 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
5555
}
5656
}
5757

58-
bool ReadNextBlob();
59-
6058
void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev);
6159

6260
void HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev);
@@ -75,8 +73,6 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
7573
private:
7674
void MakeResult(size_t reserveRows = 0);
7775

78-
void NextReadMetadata();
79-
8076
void AddRow(const TConstArrayRef<TCell>& row) override;
8177

8278
TOwnedCellVec ConvertLastKey(const std::shared_ptr<arrow::RecordBatch>& lastReadKey);
@@ -101,9 +97,9 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
10197

10298
bool SendResult(bool pageFault, bool lastBatch);
10399

104-
void SendScanError(TString reason = {});
100+
void SendScanError(const TString& reason);
105101

106-
void SendAbortExecution(TString reason = {});
102+
void SendAbortExecution(const TString& reason);
107103

108104
void Finish(const NColumnShard::TScanCounters::EStatusFinish status);
109105

@@ -123,8 +119,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
123119
const NKikimrDataEvents::EDataFormat DataFormat;
124120
const ui64 TabletId;
125121

126-
std::vector<TReadMetadataBase::TConstPtr> ReadMetadataRanges;
127-
ui32 ReadMetadataIndex;
122+
TReadMetadataBase::TConstPtr ReadMetadataRange;
128123
std::unique_ptr<TScanIteratorBase> ScanIterator;
129124

130125
std::vector<std::pair<TString, NScheme::TTypeInfo>> KeyYqlSchema;

ydb/core/tx/columnshard/engines/reader/common/result.h

-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ class TPartialReadResult {
5454
return LastReadKey;
5555
}
5656

57-
std::string ErrorString;
58-
5957
explicit TPartialReadResult(
6058
const std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>& resourcesGuards,
6159
const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey)

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(const std::shared_ptr<TReadCo
1515
}
1616
}
1717

18-
std::optional<TPartialReadResult> TColumnShardScanIterator::GetBatch() {
18+
TConclusion<std::optional<TPartialReadResult>> TColumnShardScanIterator::GetBatch() {
1919
FillReadyResults();
2020
return ReadyResults.pop_front();
2121
}
@@ -24,7 +24,7 @@ void TColumnShardScanIterator::PrepareResults() {
2424
FillReadyResults();
2525
}
2626

27-
bool TColumnShardScanIterator::ReadNextInterval() {
27+
TConclusion<bool> TColumnShardScanIterator::ReadNextInterval() {
2828
return IndexedData->ReadNextInterval();
2929
}
3030

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ class TColumnShardScanIterator: public TScanIteratorBase {
8383
return IndexedData->IsFinished() && ReadyResults.empty();
8484
}
8585

86-
std::optional<TPartialReadResult> GetBatch() override;
86+
TConclusion<std::optional<TPartialReadResult>> GetBatch() override;
8787
virtual void PrepareResults() override;
8888

89-
virtual bool ReadNextInterval() override;
89+
virtual TConclusion<bool> ReadNextInterval() override;
9090

9191
private:
9292
void FillReadyResults();

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ std::vector<TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int6
6969
return result;
7070
}
7171

72-
bool TPlainReadData::DoReadNextInterval() {
72+
TConclusion<bool> TPlainReadData::DoReadNextInterval() {
7373
return Scanner->BuildNextInterval();
7474
}
7575

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class TPlainReadData: public IDataReader, TNonCopyable {
2828
}
2929

3030
virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override;
31-
virtual bool DoReadNextInterval() override;
31+
virtual TConclusion<bool> DoReadNextInterval() override;
3232

3333
virtual void DoAbort() override {
3434
AbortedFlag = true;

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const s
7676
}
7777
}
7878

79-
bool TScanHead::BuildNextInterval() {
79+
TConclusion<bool> TScanHead::BuildNextInterval() {
8080
while (BorderPoints.size() && (FetchingIntervals.size() < InFlightLimit || BorderPoints.begin()->second.GetStartSources().empty())) {
8181
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
8282
bool includeStart = firstBorderPointInfo.GetStartSources().size();

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class TScanHead {
6969

7070
TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context);
7171

72-
bool BuildNextInterval();
72+
[[nodiscard]] TConclusion<bool> BuildNextInterval();
7373

7474
};
7575

ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class TStatsIterator : public TScanIteratorBase {
8989

9090
std::deque<std::shared_ptr<TPortionInfo>> IndexPortions;
9191

92-
virtual std::optional<TPartialReadResult> GetBatch() override {
92+
virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() override {
9393
// Take next raw batch
9494
auto batch = FillStatsBatch();
9595

@@ -99,13 +99,13 @@ class TStatsIterator : public TScanIteratorBase {
9999

100100
ApplyRangePredicates(batch);
101101
if (!batch->num_rows()) {
102-
return {};
102+
return std::nullopt;
103103
}
104104
// Leave only requested columns
105105
auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema);
106106
NArrow::TStatusValidator::Validate(ReadMetadata->GetProgram().ApplyProgram(resultBatch));
107107
if (!resultBatch->num_rows()) {
108-
return {};
108+
return std::nullopt;
109109
}
110110
TPartialReadResult out(resultBatch, lastKey);
111111

0 commit comments

Comments
 (0)