Skip to content

Commit 4b28c1f

Browse files
Merge 5849452 into 2ba1427
2 parents 2ba1427 + 5849452 commit 4b28c1f

File tree

15 files changed

+84
-104
lines changed

15 files changed

+84
-104
lines changed

ydb/core/tx/columnshard/counters/scan.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@ class TScanCounters: public TCommonCountersOwner {
4545
Success /* "Success" */ = 0,
4646
ConveyorInternalError /* "ConveyorInternalError" */,
4747
ExternalAbort /* "ExternalAbort" */,
48-
IteratorInternalError /* "IteratorInternalError" */,
48+
IteratorInternalErrorScan /* "IteratorInternalErrorScan" */,
49+
IteratorInternalErrorResult /* "IteratorInternalErrorResult" */,
4950
Deadline /* "Deadline" */,
5051
UndeliveredEvent /* "UndeliveredEvent" */,
52+
CannotAddInFlight /* "CannotAddInFlight" */,
5153

5254
COUNT
5355
};

ydb/core/tx/columnshard/engines/reader/abstract/abstract.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class TScanIteratorBase {
2323
virtual void PrepareResults() {
2424

2525
}
26-
virtual bool ReadNextInterval() { return false; }
26+
virtual TConclusion<bool> ReadNextInterval() { return false; }
2727
virtual TString DebugString(const bool verbose = false) const {
2828
Y_UNUSED(verbose);
2929
return "NO_DATA";

ydb/core/tx/columnshard/engines/reader/abstract/read_context.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class IDataReader {
108108
virtual void DoAbort() = 0;
109109
virtual bool DoIsFinished() const = 0;
110110
virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0;
111-
virtual bool DoReadNextInterval() = 0;
111+
virtual TConclusion<bool> DoReadNextInterval() = 0;
112112
public:
113113
IDataReader(const std::shared_ptr<TReadContext>& context);
114114
virtual ~IDataReader() = default;
@@ -156,7 +156,7 @@ class IDataReader {
156156
sb << DoDebugString(verbose);
157157
return sb;
158158
}
159-
bool ReadNextInterval() {
159+
[[nodiscard]] TConclusion<bool> ReadNextInterval() {
160160
return DoReadNextInterval();
161161
}
162162
};

ydb/core/tx/columnshard/engines/reader/actor/actor.cpp

+26-35
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ void TColumnShardScan::PassAway() {
4545

4646
TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId, const std::shared_ptr<IStoragesManager>& storagesManager,
4747
const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
48-
ui64 tabletId, TDuration timeout, std::vector<TReadMetadataBase::TConstPtr>&& readMetadataList,
48+
ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
4949
NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool)
5050
: StoragesManager(storagesManager)
5151
, ColumnShardActorId(columnShardActorId)
@@ -57,15 +57,14 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
5757
, RequestCookie(requestCookie)
5858
, DataFormat(dataFormat)
5959
, TabletId(tabletId)
60-
, ReadMetadataRanges(std::move(readMetadataList))
61-
, ReadMetadataIndex(0)
60+
, ReadMetadataRange(readMetadataRange)
6261
, Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT))
6362
, ScanCountersPool(scanCountersPool)
6463
, Stats(NTracing::TTraceClient::GetLocalClient("SHARD", ::ToString(TabletId)/*, "SCAN_TXID:" + ::ToString(TxId)*/))
6564
, ComputeShardingPolicy(computeShardingPolicy)
6665
{
67-
AFL_VERIFY(ReadMetadataRanges.size() == 1);
68-
KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema();
66+
AFL_VERIFY(ReadMetadataRange);
67+
KeyYqlSchema = ReadMetadataRange->GetKeyYqlSchema();
6968
}
7069

7170
void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
@@ -81,8 +80,8 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
8180
ReadCoordinatorActorId = ctx.Register(new NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));
8281

8382
std::shared_ptr<TReadContext> context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool,
84-
ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
85-
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
83+
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
84+
ScanIterator = ReadMetadataRange->StartScan(context);
8685

8786
// propagate self actor id // TODO: FlagSubscribeOnSession ?
8887
Send(ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen, TabletId), IEventHandle::FlagTrackDelivery);
@@ -92,12 +91,6 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
9291
ContinueProcessing();
9392
}
9493

95-
bool TColumnShardScan::ReadNextBlob() {
96-
while (ScanIterator->ReadNextInterval()) {
97-
}
98-
return true;
99-
}
100-
10194
void TColumnShardScan::HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
10295
--InFlightReads;
10396
auto g = Stats->MakeGuard("task_result");
@@ -263,14 +256,27 @@ void TColumnShardScan::ContinueProcessing() {
263256
while (ScanIterator && ProduceResults()) {
264257
}
265258

266-
// Switch to the next range if the current one is finished
267-
if (ScanIterator && ScanIterator->Finished() && ChunksLimiter.HasMore()) {
268-
NextReadMetadata();
269-
}
270-
271259
if (ScanIterator) {
272-
// Make read-ahead requests for the subsequent blobs
273-
ReadNextBlob();
260+
// Switch to the next range if the current one is finished
261+
if (ScanIterator->Finished() && ChunksLimiter.HasMore()) {
262+
auto g = Stats->MakeGuard("Finish");
263+
MakeResult();
264+
SendResult(false, true);
265+
ScanIterator.reset();
266+
Finish(NColumnShard::TScanCounters::EStatusFinish::Success);
267+
} else {
268+
while (true) {
269+
TConclusion<bool> hasMoreData = ScanIterator->ReadNextInterval();
270+
if (hasMoreData.IsFail()) {
271+
ACFL_ERROR("event", "ContinueProcessing")("error", hasMoreData.GetErrorMessage());
272+
ScanIterator.reset();
273+
SendScanError("iterator_error:" + hasMoreData.GetErrorMessage());
274+
return Finish(NColumnShard::TScanCounters::EStatusFinish::IteratorInternalErrorScan);
275+
} else if (!*hasMoreData) {
276+
break;
277+
}
278+
}
279+
}
274280
}
275281
AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)("gen", ScanGen)("tablet", TabletId)
276282
("debug", ScanIterator->DebugString());
@@ -286,21 +292,6 @@ void TColumnShardScan::MakeResult(size_t reserveRows /*= 0*/) {
286292
}
287293
}
288294

289-
void TColumnShardScan::NextReadMetadata() {
290-
auto g = Stats->MakeGuard("NextReadMetadata");
291-
if (++ReadMetadataIndex == ReadMetadataRanges.size()) {
292-
// Send empty batch with "finished" flag
293-
MakeResult();
294-
SendResult(false, true);
295-
ScanIterator.reset();
296-
return Finish(NColumnShard::TScanCounters::EStatusFinish::Success);
297-
}
298-
299-
auto context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool, ReadMetadataRanges[ReadMetadataIndex], SelfId(),
300-
ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
301-
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
302-
}
303-
304295
void TColumnShardScan::AddRow(const TConstArrayRef<TCell>& row) {
305296
Result->Rows.emplace_back(TOwnedCellVec::Make(row));
306297
++Rows;

ydb/core/tx/columnshard/engines/reader/actor/actor.h

+2-7
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
3333
TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId,
3434
const std::shared_ptr<IStoragesManager>& storagesManager, const TComputeShardingPolicy& computeShardingPolicy,
3535
ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
36-
ui64 tabletId, TDuration timeout, std::vector<TReadMetadataBase::TConstPtr>&& readMetadataList,
36+
ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
3737
NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool);
3838

3939
void Bootstrap(const TActorContext& ctx);
@@ -55,8 +55,6 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
5555
}
5656
}
5757

58-
bool ReadNextBlob();
59-
6058
void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev);
6159

6260
void HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev);
@@ -75,8 +73,6 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
7573
private:
7674
void MakeResult(size_t reserveRows = 0);
7775

78-
void NextReadMetadata();
79-
8076
void AddRow(const TConstArrayRef<TCell>& row) override;
8177

8278
TOwnedCellVec ConvertLastKey(const std::shared_ptr<arrow::RecordBatch>& lastReadKey);
@@ -123,8 +119,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
123119
const NKikimrDataEvents::EDataFormat DataFormat;
124120
const ui64 TabletId;
125121

126-
std::vector<TReadMetadataBase::TConstPtr> ReadMetadataRanges;
127-
ui32 ReadMetadataIndex;
122+
TReadMetadataBase::TConstPtr ReadMetadataRange;
128123
std::unique_ptr<TScanIteratorBase> ScanIterator;
129124

130125
std::vector<std::pair<TString, NScheme::TTypeInfo>> KeyYqlSchema;

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ void TColumnShardScanIterator::PrepareResults() {
2424
FillReadyResults();
2525
}
2626

27-
bool TColumnShardScanIterator::ReadNextInterval() {
27+
TConclusion<bool> TColumnShardScanIterator::ReadNextInterval() {
2828
return IndexedData->ReadNextInterval();
2929
}
3030

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/iterator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class TColumnShardScanIterator: public TScanIteratorBase {
8686
std::optional<TPartialReadResult> GetBatch() override;
8787
virtual void PrepareResults() override;
8888

89-
virtual bool ReadNextInterval() override;
89+
virtual TConclusion<bool> ReadNextInterval() override;
9090

9191
private:
9292
void FillReadyResults();

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ std::vector<TPartialReadResult> TPlainReadData::DoExtractReadyResults(const int6
6969
return result;
7070
}
7171

72-
bool TPlainReadData::DoReadNextInterval() {
72+
TConclusion<bool> TPlainReadData::DoReadNextInterval() {
7373
return Scanner->BuildNextInterval();
7474
}
7575

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class TPlainReadData: public IDataReader, TNonCopyable {
2828
}
2929

3030
virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override;
31-
virtual bool DoReadNextInterval() override;
31+
virtual TConclusion<bool> DoReadNextInterval() override;
3232

3333
virtual void DoAbort() override {
3434
AbortedFlag = true;

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ TScanHead::TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const s
7676
}
7777
}
7878

79-
bool TScanHead::BuildNextInterval() {
79+
TConclusion<bool> TScanHead::BuildNextInterval() {
8080
while (BorderPoints.size() && (FetchingIntervals.size() < InFlightLimit || BorderPoints.begin()->second.GetStartSources().empty())) {
8181
auto firstBorderPointInfo = std::move(BorderPoints.begin()->second);
8282
bool includeStart = firstBorderPointInfo.GetStartSources().size();

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class TScanHead {
6969

7070
TScanHead(std::deque<std::shared_ptr<IDataSource>>&& sources, const std::shared_ptr<TSpecialReadContext>& context);
7171

72-
bool BuildNextInterval();
72+
[[nodiscard]] TConclusion<bool> BuildNextInterval();
7373

7474
};
7575

ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp

+31-36
Original file line numberDiff line numberDiff line change
@@ -150,34 +150,31 @@ bool TTxScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/
150150
if (!record.RangesSize()) {
151151
auto range = scannerConstructor->BuildReadMetadata(Self, read);
152152
if (range) {
153-
ReadMetadataRanges = {range.DetachResult()};
153+
ReadMetadataRange = range.DetachResult();
154154
} else {
155155
ErrorDescription = range.GetErrorMessage();
156156
}
157157
return true;
158158
}
159159

160-
ReadMetadataRanges.reserve(1);
161-
162160
auto ydbKey = scannerConstructor->GetPrimaryKeyScheme(Self);
163161
auto* indexInfo = (vIndex && isIndex) ? &vIndex->GetSchema(snapshot)->GetIndexInfo() : nullptr;
164162
for (auto& range : record.GetRanges()) {
165163
if (!FillPredicatesFromRange(read, range, ydbKey, Self->TabletID(), indexInfo, ErrorDescription)) {
166-
ReadMetadataRanges.clear();
164+
ReadMetadataRange = nullptr;
167165
return true;
168166
}
169167
}
170168
{
171169
auto newRange = scannerConstructor->BuildReadMetadata(Self, read);
172170
if (!newRange) {
173171
ErrorDescription = newRange.GetErrorMessage();
174-
ReadMetadataRanges.clear();
172+
ReadMetadataRange = nullptr;
175173
return true;
176174
}
177-
ReadMetadataRanges.emplace_back(newRange.DetachResult());
175+
ReadMetadataRange = newRange.DetachResult();
178176
}
179-
Y_ABORT_UNLESS(ReadMetadataRanges.size() == 1);
180-
177+
AFL_VERIFY(ReadMetadataRange);
181178
return true;
182179
}
183180

@@ -198,6 +195,7 @@ struct TContainerPrinter {
198195
};
199196

200197
void TTxScan::Complete(const TActorContext& ctx) {
198+
201199
auto& request = Ev->Get()->Record;
202200
auto scanComputeActor = Ev->Sender;
203201
const auto& snapshot = request.GetSnapshot();
@@ -210,38 +208,43 @@ void TTxScan::Complete(const TActorContext& ctx) {
210208
if (scanGen > 1) {
211209
Self->IncCounter(NColumnShard::COUNTER_SCAN_RESTARTED);
212210
}
211+
const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build()
212+
("tx_id", txId)("scan_id", scanId)("gen", scanGen)("table", table)("snapshot", snapshot)("tablet", Self->TabletID())("timeout", timeout);
213213

214-
TStringStream detailedInfo;
215-
if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) {
216-
detailedInfo << " read metadata: (" << TContainerPrinter(ReadMetadataRanges) << ")" << " req: " << request;
217-
}
218-
if (ReadMetadataRanges.empty()) {
219-
LOG_S_DEBUG("TTxScan failed "
220-
<< " txId: " << txId
221-
<< " scanId: " << scanId
222-
<< " gen: " << scanGen
223-
<< " table: " << table
224-
<< " snapshot: " << snapshot
225-
<< " timeout: " << timeout
226-
<< detailedInfo.Str()
227-
<< " at tablet " << Self->TabletID());
214+
if (!ReadMetadataRange) {
215+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", "no metadata");
228216

229217
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(scanGen, Self->TabletID());
230-
231218
ev->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
232219
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, TStringBuilder()
233-
<< "Table " << table << " (shard " << Self->TabletID() << ") scan failed, reason: " << ErrorDescription ? ErrorDescription : "unknown error");
220+
<< "Table " << table << " (shard " << Self->TabletID() << ") scan failed, reason: " << ErrorDescription ? ErrorDescription : "no metadata ranges");
234221
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
235222

236223
ctx.Send(scanComputeActor, ev.Release());
237224
return;
238225
}
226+
TStringBuilder detailedInfo;
227+
if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_TRACE, NKikimrServices::TX_COLUMNSHARD)) {
228+
detailedInfo << " read metadata: (" << *ReadMetadataRange << ")" << " req: " << request;
229+
}
239230

240231
const TVersionedIndex* index = nullptr;
241232
if (Self->HasIndex()) {
242233
index = &Self->GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex();
243234
}
244-
ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRanges, index);
235+
const TConclusion<ui64> requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(ReadMetadataRange, index);
236+
if (!requestCookie) {
237+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("reason", requestCookie.GetErrorMessage())("trace_details", detailedInfo);
238+
auto ev = MakeHolder<NKqp::TEvKqpCompute::TEvScanError>(scanGen, Self->TabletID());
239+
240+
ev->Record.SetStatus(Ydb::StatusIds::INTERNAL_ERROR);
241+
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder()
242+
<< "Table " << table << " (shard " << Self->TabletID() << ") scan failed, reason: " << requestCookie.GetErrorMessage());
243+
NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add());
244+
Self->ScanCounters.OnScanDuration(NColumnShard::TScanCounters::EStatusFinish::CannotAddInFlight, TDuration::Zero());
245+
ctx.Send(scanComputeActor, ev.Release());
246+
return;
247+
}
245248
auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta();
246249

247250
Self->IncCounter(NColumnShard::COUNTER_READ_INDEX_PORTIONS, statsDelta.Portions);
@@ -253,17 +256,9 @@ void TTxScan::Complete(const TActorContext& ctx) {
253256
AFL_VERIFY(shardingPolicy.DeserializeFromProto(request.GetComputeShardingPolicy()));
254257

255258
auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(),
256-
shardingPolicy, scanId, txId, scanGen, requestCookie, Self->TabletID(), timeout, std::move(ReadMetadataRanges), dataFormat, Self->ScanCounters));
257-
258-
LOG_S_DEBUG("TTxScan starting " << scanActor
259-
<< " txId: " << txId
260-
<< " scanId: " << scanId
261-
<< " gen: " << scanGen
262-
<< " table: " << table
263-
<< " snapshot: " << snapshot
264-
<< " timeout: " << timeout
265-
<< detailedInfo.Str()
266-
<< " at tablet " << Self->TabletID());
259+
shardingPolicy, scanId, txId, scanGen, *requestCookie, Self->TabletID(), timeout, ReadMetadataRange, dataFormat, Self->ScanCounters));
260+
261+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan started")("actor_id", scanActor)("trace_detailed", detailedInfo);
267262
}
268263

269264
}

ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class TTxScan: public NTabletFlatExecutor::TTransactionBase<NColumnShard::TColum
2222
private:
2323
TString ErrorDescription;
2424
TEvColumnShard::TEvScan::TPtr Ev;
25-
std::vector<TReadMetadataPtr> ReadMetadataRanges;
25+
TReadMetadataPtr ReadMetadataRange;
2626
};
2727

2828
}

0 commit comments

Comments
 (0)