Skip to content

conclusions for unready storages and scanner methods #3282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/counters/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ class TScanCounters: public TCommonCountersOwner {
Success /* "Success" */ = 0,
ConveyorInternalError /* "ConveyorInternalError" */,
ExternalAbort /* "ExternalAbort" */,
IteratorInternalError /* "IteratorInternalError" */,
IteratorInternalErrorScan /* "IteratorInternalErrorScan" */,
IteratorInternalErrorResult /* "IteratorInternalErrorResult" */,
Deadline /* "Deadline" */,
UndeliveredEvent /* "UndeliveredEvent" */,
CannotAddInFlight /* "CannotAddInFlight" */,

COUNT
};
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/reader/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ class TScanIteratorBase {
return {};
}
virtual bool Finished() const = 0;
virtual std::optional<TPartialReadResult> GetBatch() = 0;
virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() = 0;
virtual void PrepareResults() {

}
virtual bool ReadNextInterval() { return false; }
virtual TConclusion<bool> ReadNextInterval() { return false; }
virtual TString DebugString(const bool verbose = false) const {
Y_UNUSED(verbose);
return "NO_DATA";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class IDataReader {
virtual void DoAbort() = 0;
virtual bool DoIsFinished() const = 0;
virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0;
virtual bool DoReadNextInterval() = 0;
virtual TConclusion<bool> DoReadNextInterval() = 0;
public:
IDataReader(const std::shared_ptr<TReadContext>& context);
virtual ~IDataReader() = default;
Expand Down Expand Up @@ -156,7 +156,7 @@ class IDataReader {
sb << DoDebugString(verbose);
return sb;
}
bool ReadNextInterval() {
[[nodiscard]] TConclusion<bool> ReadNextInterval() {
return DoReadNextInterval();
}
};
Expand Down
96 changes: 44 additions & 52 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void TColumnShardScan::PassAway() {

TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId, const std::shared_ptr<IStoragesManager>& storagesManager,
const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
ui64 tabletId, TDuration timeout, std::vector<TReadMetadataBase::TConstPtr>&& readMetadataList,
ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool)
: StoragesManager(storagesManager)
, ColumnShardActorId(columnShardActorId)
Expand All @@ -57,15 +57,14 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
, RequestCookie(requestCookie)
, DataFormat(dataFormat)
, TabletId(tabletId)
, ReadMetadataRanges(std::move(readMetadataList))
, ReadMetadataIndex(0)
, ReadMetadataRange(readMetadataRange)
, Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT))
, ScanCountersPool(scanCountersPool)
, Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/))
, ComputeShardingPolicy(computeShardingPolicy)
{
AFL_VERIFY(ReadMetadataRanges.size() == 1);
KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema();
AFL_VERIFY(ReadMetadataRange);
KeyYqlSchema = ReadMetadataRange->GetKeyYqlSchema();
}

void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
Expand All @@ -81,8 +80,8 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
ReadCoordinatorActorId = ctx.Register(new NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));

std::shared_ptr<TReadContext> context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool,
ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
ScanIterator = ReadMetadataRange->StartScan(context);

// propagate self actor id // TODO: FlagSubscribeOnSession ?
Send(ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen, TabletId), IEventHandle::FlagTrackDelivery);
Expand All @@ -92,12 +91,6 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
ContinueProcessing();
}

bool TColumnShardScan::ReadNextBlob() {
while (ScanIterator->ReadNextInterval()) {
}
return true;
}

void TColumnShardScan::HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
--InFlightReads;
auto g = Stats->MakeGuard("task_result");
Expand Down Expand Up @@ -198,21 +191,24 @@ bool TColumnShardScan::ProduceResults() noexcept {
return false;
}

auto resultOpt = ScanIterator->GetBatch();
if (!resultOpt) {
ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
auto resultConclusion = ScanIterator->GetBatch();
if (resultConclusion.IsFail()) {
ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", resultConclusion.GetErrorMessage());
SendAbortExecution(resultConclusion.GetErrorMessage());

ScanIterator.reset();
Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalErrorResult);
return false;
}
auto& result = *resultOpt;
if (!result.ErrorString.empty()) {
ACFL_ERROR("stage", "got error")("iterator", ScanIterator->DebugString())("message", result.ErrorString);
SendAbortExecution(TString(result.ErrorString.data(), result.ErrorString.size()));

ScanIterator.reset();
Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalError);
std::optional<TPartialReadResult> resultOpt = resultConclusion.DetachResult();
if (!resultOpt) {
ACFL_DEBUG("stage", "no data is ready yet")("iterator", ScanIterator->DebugString());
return false;
}

auto& result = *resultOpt;

if (!result.GetRecordsCount()) {
ACFL_DEBUG("stage", "got empty batch")("iterator", ScanIterator->DebugString());
return true;
Expand Down Expand Up @@ -263,14 +259,27 @@ void TColumnShardScan::ContinueProcessing() {
while (ScanIterator && ProduceResults()) {
}

// Switch to the next range if the current one is finished
if (ScanIterator && ScanIterator->Finished() && ChunksLimiter.HasMore()) {
NextReadMetadata();
}

if (ScanIterator) {
// Make read-ahead requests for the subsequent blobs
ReadNextBlob();
// Switch to the next range if the current one is finished
if (ScanIterator->Finished() && ChunksLimiter.HasMore()) {
auto g = Stats->MakeGuard("Finish");
MakeResult();
SendResult(false, true);
ScanIterator.reset();
Finish(NColumnShard::TScanCounters::EStatusFinish::Success);
} else {
while (true) {
TConclusion<bool> hasMoreData = ScanIterator->ReadNextInterval();
if (hasMoreData.IsFail()) {
ACFL_ERROR("event", "ContinueProcessing")("error", hasMoreData.GetErrorMessage());
ScanIterator.reset();
SendScanError("iterator_error:" + hasMoreData.GetErrorMessage());
return Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalErrorScan);
} else if (!*hasMoreData) {
break;
}
}
}
}
AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)("gen", ScanGen)("tablet", TabletId)
("debug", ScanIterator->DebugString());
Expand All @@ -286,21 +295,6 @@ void TColumnShardScan::MakeResult(size_t reserveRows /*= 0*/) {
}
}

void TColumnShardScan::NextReadMetadata() {
auto g = Stats->MakeGuard("NextReadMetadata");
if (++ReadMetadataIndex == ReadMetadataRanges.size()) {
// Send empty batch with "finished" flag
MakeResult();
SendResult(false, true);
ScanIterator.reset();
return Finish(NColumnShard::TScanCounters::EStatusFinish::Success);
}

auto context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(),
ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
}

void TColumnShardScan::AddRow(const TConstArrayRef<TCell>& row) {
Result->Rows.emplace_back(TOwnedCellVec::Make(row));
++Rows;
Expand Down Expand Up @@ -379,11 +373,10 @@ bool TColumnShardScan::SendResult(bool pageFault, bool lastBatch) {
return true;
}

void TColumnShardScan::SendScanError(TString reason /*= {}*/) {
void TColumnShardScan::SendScanError(const TString& reason) {
AFL_VERIFY(reason);
TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId;
if (!reason.empty()) {
msg += ", reason: " + reason;
}
msg += ", reason: " + reason;

auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(ScanGen, TabletId);
ev->Record.SetStatus(Ydb::StatusIds::GENERIC_ERROR);
Expand All @@ -393,12 +386,11 @@ void TColumnShardScan::SendScanError(TString reason /*= {}*/) {
Send(ScanComputeActorId, ev.Release());
}

void TColumnShardScan::SendAbortExecution(TString reason /*= {}*/) {
void TColumnShardScan::SendAbortExecution(const TString& reason) {
AFL_VERIFY(reason);
auto status = NYql::NDqProto::StatusIds::PRECONDITION_FAILED;
TString msg = TStringBuilder() << "Scan failed at tablet " << TabletId;
if (!reason.empty()) {
msg += ", reason: " + reason;
}
msg += ", reason: " + reason;

Send(ScanComputeActorId, new NKqp::TEvKqp::TEvAbortExecution(status, msg));
}
Expand Down
13 changes: 4 additions & 9 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId,
const std::shared_ptr<IStoragesManager>& storagesManager, const TComputeShardingPolicy& computeShardingPolicy,
ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
ui64 tabletId, TDuration timeout, std::vector<TReadMetadataBase::TConstPtr>&& readMetadataList,
ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool);

void Bootstrap(const TActorContext& ctx);
Expand All @@ -55,8 +55,6 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
}
}

bool ReadNextBlob();

void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev);

void HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev);
Expand All @@ -75,8 +73,6 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
private:
void MakeResult(size_t reserveRows = 0);

void NextReadMetadata();

void AddRow(const TConstArrayRef<TCell>& row) override;

TOwnedCellVec ConvertLastKey(const std::shared_ptr<arrow::RecordBatch>& lastReadKey);
Expand All @@ -101,9 +97,9 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo

bool SendResult(bool pageFault, bool lastBatch);

void SendScanError(TString reason = {});
void SendScanError(const TString& reason);

void SendAbortExecution(TString reason = {});
void SendAbortExecution(const TString& reason);

void Finish(const NColumnShard::TScanCounters::EStatusFinish status);

Expand All @@ -123,8 +119,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
const NKikimrDataEvents::EDataFormat DataFormat;
const ui64 TabletId;

std::vector<TReadMetadataBase::TConstPtr> ReadMetadataRanges;
ui32 ReadMetadataIndex;
TReadMetadataBase::TConstPtr ReadMetadataRange;
std::unique_ptr<TScanIteratorBase> ScanIterator;

std::vector<std::pair<TString, NScheme::TTypeInfo>> KeyYqlSchema;
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/tx/columnshard/engines/reader/common/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ class TPartialReadResult {
return LastReadKey;
}

std::string ErrorString;

explicit TPartialReadResult(
const std::vector<std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>>& resourcesGuards,
const NArrow::TShardedRecordBatch& batch, std::shared_ptr<arrow::RecordBatch> lastKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(const std::shared_ptr<TReadCo
}
}

std::optional<TPartialReadResult> TColumnShardScanIterator::GetBatch() {
TConclusion<std::optional<TPartialReadResult>> TColumnShardScanIterator::GetBatch() {
FillReadyResults();
return ReadyResults.pop_front();
}
Expand All @@ -24,7 +24,7 @@ void TColumnShardScanIterator::PrepareResults() {
FillReadyResults();
}

bool TColumnShardScanIterator::ReadNextInterval() {
TConclusion<bool> TColumnShardScanIterator::ReadNextInterval() {
return IndexedData->ReadNextInterval();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ class TColumnShardScanIterator: public TScanIteratorBase {
return IndexedData->IsFinished() && ReadyResults.empty();
}

std::optional<TPartialReadResult> GetBatch() override;
TConclusion<std::optional<TPartialReadResult>> GetBatch() override;
virtual void PrepareResults() override;

virtual bool ReadNextInterval() override;
virtual TConclusion<bool> ReadNextInterval() override;

private:
void FillReadyResults();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ std::vector<TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int6
return result;
}

bool TPlainReadData::DoReadNextInterval() {
TConclusion<bool> TPlainReadData::DoReadNextInterval() {
return Scanner->BuildNextInterval();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TPlainReadData: public IDataReader, TNonCopyable {
}

virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override;
virtual bool DoReadNextInterval() override;
virtual TConclusion<bool> DoReadNextInterval() override;

virtual void DoAbort() override {
AbortedFlag = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const s
}
}

bool TScanHead::BuildNextInterval() {
TConclusion<bool> TScanHead::BuildNextInterval() {
while (BorderPoints.size() && (FetchingIntervals.size() < InFlightLimit || BorderPoints.begin()->second.GetStartSources().empty())) {
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
bool includeStart = firstBorderPointInfo.GetStartSources().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class TScanHead {

TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context);

bool BuildNextInterval();
[[nodiscard]] TConclusion<bool> BuildNextInterval();

};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TStatsIterator : public TScanIteratorBase {

std::deque<std::shared_ptr<TPortionInfo>> IndexPortions;

virtual std::optional<TPartialReadResult> GetBatch() override {
virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() override {
// Take next raw batch
auto batch = FillStatsBatch();

Expand All @@ -99,13 +99,13 @@ class TStatsIterator : public TScanIteratorBase {

ApplyRangePredicates(batch);
if (!batch->num_rows()) {
return {};
return std::nullopt;
}
// Leave only requested columns
auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema);
NArrow::TStatusValidator::Validate(ReadMetadata->GetProgram().ApplyProgram(resultBatch));
if (!resultBatch->num_rows()) {
return {};
return std::nullopt;
}
TPartialReadResult out(resultBatch, lastKey);

Expand Down
Loading