Skip to content

Commit 1041280

Browse files
correct long requests abortion (#14888)
1 parent ef68464 commit 1041280

File tree

4 files changed

+3
-8
lines changed

4 files changed

+3
-8
lines changed

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const s
125125
}
126126

127127
TConclusion<bool> TScanHead::BuildNextInterval() {
128-
if (Context->IsAborted()) {
129-
return false;
130-
}
131-
while (BorderPoints.size()) {
128+
while (BorderPoints.size() && !Context->IsAborted()) {
132129
if (BorderPoints.begin()->second.GetStartSources().size()) {
133130
if (FetchingIntervals.size() >= InFlightLimit) {
134131
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_next_interval")("reason", "too many intervals in flight")(

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetch
2323

2424
void IDataSource::RegisterInterval(TFetchingInterval& interval, const std::shared_ptr<IDataSource>& sourcePtr) {
2525
AFL_VERIFY(FetchingPlan);
26-
AFL_VERIFY(!GetContext()->IsAborted());
2726
if (!IsReadyFlag) {
2827
AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second);
2928
}

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
126126
}
127127
bool changed = false;
128128
if (!Context->GetCommonContext()->GetReadMetadata()->HasLimit()) {
129-
while (SortedSources.size() && FetchingSources.size() < InFlightLimit) {
129+
while (SortedSources.size() && FetchingSources.size() < InFlightLimit && Context->IsActive()) {
130130
SortedSources.front()->StartProcessing(SortedSources.front());
131131
FetchingSources.emplace_back(SortedSources.front());
132132
AFL_VERIFY(FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()).second);
@@ -139,7 +139,7 @@ TConclusion<bool> TScanHead::BuildNextInterval() {
139139
}
140140
ui32 inFlightCountLocal = GetInFlightIntervalsCount();
141141
AFL_VERIFY(IntervalsInFlightCount == inFlightCountLocal)("count_global", IntervalsInFlightCount)("count_local", inFlightCountLocal);
142-
while (SortedSources.size() && inFlightCountLocal < InFlightLimit) {
142+
while (SortedSources.size() && inFlightCountLocal < InFlightLimit && Context->IsActive()) {
143143
SortedSources.front()->StartProcessing(SortedSources.front());
144144
FetchingSources.emplace_back(SortedSources.front());
145145
AFL_VERIFY(FetchingSourcesByIdx.emplace(SortedSources.front()->GetSourceIdx(), SortedSources.front()).second);

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetch
2323
void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr) {
2424
AFL_VERIFY(!ProcessingStarted);
2525
AFL_VERIFY(FetchingPlan);
26-
AFL_VERIFY(!GetContext()->IsAborted());
2726
ProcessingStarted = true;
2827
SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
2928
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());

0 commit comments

Comments
 (0)