Skip to content

Commit 60d02e2

Browse files
ivanmorozov333ivanmorozov333
and
ivanmorozov333
authored
cursor validations (#17223)
Co-authored-by: ivanmorozov333 <[email protected]>
1 parent b27f03f commit 60d02e2

File tree

16 files changed

+179
-153
lines changed

16 files changed

+179
-153
lines changed

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

Lines changed: 134 additions & 131 deletions
Large diffs are not rendered by default.

ydb/core/protos/kqp.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,8 @@ message TEvScanPing {
681681
}
682682

683683
message TEvKqpScanCursor {
684+
optional uint64 TabletId = 1;
685+
684686
message TColumnShardScanPlain {
685687
}
686688
message TColumnShardScanSimple {

ydb/core/tx/columnshard/engines/portions/meta.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class TPortionMetaBase {
6565
class TPortionMeta: public TPortionMetaBase {
6666
private:
6767
using TBase = TPortionMetaBase;
68-
NArrow::TFirstLastSpecialKeys ReplaceKeyEdges; // first and last PK rows
68+
NArrow::TFirstLastSpecialKeys ReplaceKeyEdges;
6969
YDB_READONLY_DEF(TString, TierName);
7070
YDB_READONLY(ui32, DeletionsCount, 0);
7171
YDB_READONLY(ui32, CompactionLevel, 0);

ydb/core/tx/columnshard/engines/predicate/filter.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ class ICursorEntity {
120120

121121
class IScanCursor {
122122
private:
123+
YDB_ACCESSOR_DEF(std::optional<ui64>, TabletId);
124+
123125
virtual const std::shared_ptr<arrow::RecordBatch>& DoGetPKCursor() const = 0;
124126
virtual bool DoCheckEntityIsBorder(const ICursorEntity& entity, bool& usage) const = 0;
125127
virtual bool DoCheckSourceIntervalUsage(const ui64 sourceId, const ui32 indexStart, const ui32 recordsCount) const = 0;
@@ -146,11 +148,17 @@ class IScanCursor {
146148
}
147149

148150
TConclusionStatus DeserializeFromProto(const NKikimrKqp::TEvKqpScanCursor& proto) {
151+
if (proto.HasTabletId()) {
152+
TabletId = proto.GetTabletId();
153+
}
149154
return DoDeserializeFromProto(proto);
150155
}
151156

152157
NKikimrKqp::TEvKqpScanCursor SerializeToProto() const {
153158
NKikimrKqp::TEvKqpScanCursor result;
159+
if (TabletId) {
160+
result.SetTabletId(*TabletId);
161+
}
154162
DoSerializeToProto(result);
155163
return result;
156164
}

ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#pragma once
2+
#include <ydb/core/tx/columnshard/common/path_id.h>
23
#include <ydb/core/tx/columnshard/engines/column_engine.h>
34
#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h>
45
#include <ydb/core/tx/columnshard/engines/reader/common/description.h>
56
#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h>
6-
#include <ydb/core/tx/columnshard/common/path_id.h>
77

88
namespace NKikimr::NOlap {
99
class TPortionInfo;
@@ -46,6 +46,7 @@ class TReadMetadataBase {
4646
TSnapshot RequestSnapshot;
4747
std::optional<TGranuleShardingInfo> RequestShardingInfo;
4848
std::shared_ptr<IScanCursor> ScanCursor;
49+
const ui64 TabletId;
4950
virtual void DoOnReadFinished(NColumnShard::TColumnShard& /*owner*/) const {
5051
}
5152
virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& /*owner*/) const {
@@ -61,7 +62,11 @@ class TReadMetadataBase {
6162
public:
6263
using TConstPtr = std::shared_ptr<const TReadMetadataBase>;
6364

64-
void SetRequestedLimit(const ui64 value) {
65+
ui64 GetTabletId() const {
66+
return TabletId;
67+
}
68+
69+
void SetRequestedLimit(const ui64 value) {
6570
AFL_VERIFY(!RequestedLimit);
6671
if (value == 0 || value >= Max<i64>()) {
6772
return;
@@ -172,20 +177,23 @@ class TReadMetadataBase {
172177
}
173178

174179
TReadMetadataBase(const std::shared_ptr<TVersionedIndex> index, const ESorting sorting, const TProgramContainer& ssaProgram,
175-
const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot, const std::shared_ptr<IScanCursor>& scanCursor)
180+
const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot, const std::shared_ptr<IScanCursor>& scanCursor,
181+
const ui64 tabletId)
176182
: Sorting(sorting)
177183
, Program(ssaProgram)
178184
, IndexVersionsPointer(index)
179185
, RequestSnapshot(requestSnapshot)
180186
, ScanCursor(scanCursor)
181-
, ResultIndexSchema(schema)
182-
{
187+
, TabletId(tabletId)
188+
, ResultIndexSchema(schema) {
189+
AFL_VERIFY(!ScanCursor || !ScanCursor->GetTabletId() || (*ScanCursor->GetTabletId() == TabletId))("cursor", ScanCursor->GetTabletId())(
190+
"tablet_id", TabletId);
183191
}
184192
virtual ~TReadMetadataBase() = default;
185193

186194
virtual TString DebugString() const {
187195
return TStringBuilder() << " predicate{" << (PKRangesFilter ? PKRangesFilter->DebugString() : "no_initialized") << "}"
188-
<< " " << Sorting << " sorted";
196+
<< " " << Sorting << " sorted";
189197
}
190198

191199
std::set<ui32> GetProcessingColumnIds() const {

ydb/core/tx/columnshard/engines/reader/common/description.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ struct TReadDescription {
2121
std::shared_ptr<IScanCursor> ScanCursor;
2222
YDB_ACCESSOR_DEF(TString, ScanIdentifier);
2323
YDB_ACCESSOR(ERequestSorting, Sorting, ERequestSorting::NONE);
24+
YDB_READONLY(ui64, TabletId, 0);
2425

2526
public:
2627
// Table
@@ -47,9 +48,10 @@ struct TReadDescription {
4748
ScanCursor = cursor;
4849
}
4950

50-
TReadDescription(const TSnapshot& snapshot, const ERequestSorting sorting)
51+
TReadDescription(const ui64 tabletId, const TSnapshot& snapshot, const ERequestSorting sorting)
5152
: Snapshot(snapshot)
5253
, Sorting(sorting)
54+
, TabletId(tabletId)
5355
, PKRangesFilter(std::make_shared<NOlap::TPKRangesFilter>()) {
5456
}
5557

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ TConclusionStatus TReadMetadata::Init(
4444

4545
TReadMetadata::TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read)
4646
: TBase(schemaIndex, read.GetSorting(), read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(),
47-
read.GetScanCursorOptional())
47+
read.GetScanCursorOptional(), read.GetTabletId())
4848
, PathId(read.PathId)
4949
, ReadStats(std::make_shared<TReadStats>()) {
5050
}

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections/abstract.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@ class ISourcesCollection {
3232
return DoHasData();
3333
}
3434

35-
std::shared_ptr<IScanCursor> BuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const {
35+
std::shared_ptr<IScanCursor> BuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords, const ui64 tabletId) const {
3636
AFL_VERIFY(source);
3737
AFL_VERIFY(readyRecords <= source->GetRecordsCount())("count", source->GetRecordsCount())("ready", readyRecords);
38-
return DoBuildCursor(source, readyRecords);
38+
auto result = DoBuildCursor(source, readyRecords);
39+
AFL_VERIFY(result);
40+
result->SetTabletId(tabletId);
41+
AFL_VERIFY(tabletId);
42+
return result;
3943
}
4044

4145
TString DebugString() const {

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ ISyncPoint::ESourceAction TSyncPointResult::OnSourceReady(const std::shared_ptr<
1717
}
1818
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "has_result")("source_id", source->GetSourceId())(
1919
"source_idx", source->GetSourceIdx())("table", resultChunk->GetTable()->num_rows())("is_finished", isFinished);
20-
auto cursor = Collection->BuildCursor(source, resultChunk->GetStartIndex() + resultChunk->GetRecordsCount());
20+
auto cursor = Collection->BuildCursor(source, resultChunk->GetStartIndex() + resultChunk->GetRecordsCount(),
21+
Context->GetCommonContext()->GetReadMetadata()->GetTabletId());
2122
reader.OnIntervalResult(std::make_shared<TPartialReadResult>(source->GetResourceGuards(), source->GetGroupGuard(),
2223
resultChunk->GetTable(), cursor, Context->GetCommonContext(), partialSourceAddress));
2324
} else if (!isFinished) {

ydb/core/tx/columnshard/engines/reader/sys_view/abstract/metadata.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,13 @@ struct TReadStatsMetadata: public TReadMetadataBase {
1111
public:
1212
using TConstPtr = std::shared_ptr<const TReadStatsMetadata>;
1313

14-
const ui64 TabletId;
1514
std::vector<ui32> ReadColumnIds;
1615
std::vector<ui32> ResultColumnIds;
1716
std::deque<TGranuleMetaView> IndexGranules;
1817

1918
explicit TReadStatsMetadata(const std::shared_ptr<TVersionedIndex>& info, ui64 tabletId, const ESorting sorting,
2019
const TProgramContainer& ssaProgram, const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
21-
: TBase(info, sorting, ssaProgram, schema, requestSnapshot, nullptr)
22-
, TabletId(tabletId) {
20+
: TBase(info, sorting, ssaProgram, schema, requestSnapshot, nullptr, tabletId) {
2321
}
2422
};
2523

ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ void TStatsIterator::AppendStats(
3737
for (auto&& r : records) {
3838
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId().GetRawValue());
3939
NArrow::Append<arrow::StringType>(*builders[1], prodView);
40-
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->TabletId);
40+
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->GetTabletId());
4141
NArrow::Append<arrow::UInt64Type>(*builders[3], r->GetMeta().GetRecordsCount());
4242
NArrow::Append<arrow::UInt64Type>(*builders[4], r->GetMeta().GetRawBytes());
4343
NArrow::Append<arrow::UInt64Type>(*builders[5], portion.GetPortionId());
@@ -94,7 +94,7 @@ void TStatsIterator::AppendStats(
9494
for (auto&& r : indexes) {
9595
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId().GetRawValue());
9696
NArrow::Append<arrow::StringType>(*builders[1], prodView);
97-
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->TabletId);
97+
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->GetTabletId());
9898
NArrow::Append<arrow::UInt64Type>(*builders[3], r->GetRecordsCount());
9999
NArrow::Append<arrow::UInt64Type>(*builders[4], r->GetRawBytes());
100100
NArrow::Append<arrow::UInt64Type>(*builders[5], portion.GetPortionId());

ydb/core/tx/columnshard/engines/reader/sys_view/granules/granules.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace NKikimr::NOlap::NReader::NSysView::NGranules {
1010

1111
bool TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const {
1212
NArrow::Append<arrow::UInt64Type>(*builders[0], granule.GetPathId().GetRawValue());
13-
NArrow::Append<arrow::UInt64Type>(*builders[1], ReadMetadata->TabletId);
13+
NArrow::Append<arrow::UInt64Type>(*builders[1], ReadMetadata->GetTabletId());
1414
NArrow::Append<arrow::UInt64Type>(*builders[2], granule.GetPortions().size());
1515
NArrow::Append<arrow::StringType>(*builders[3], HostNameField);
1616
NArrow::Append<arrow::UInt64Type>(*builders[4], NActors::TActivationContext::AsActorContext().SelfID.NodeId());

ydb/core/tx/columnshard/engines/reader/sys_view/optimizer/optimizer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace NKikimr::NOlap::NReader::NSysView::NOptimizer {
1111
bool TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const {
1212
for (auto&& i : granule.GetOptimizerTasks()) {
1313
NArrow::Append<arrow::UInt64Type>(*builders[0], granule.GetPathId().GetRawValue());
14-
NArrow::Append<arrow::UInt64Type>(*builders[1], ReadMetadata->TabletId);
14+
NArrow::Append<arrow::UInt64Type>(*builders[1], ReadMetadata->GetTabletId());
1515
NArrow::Append<arrow::UInt64Type>(*builders[2], i.GetTaskId());
1616
NArrow::Append<arrow::StringType>(*builders[3], HostNameField);
1717
NArrow::Append<arrow::UInt64Type>(*builders[4], NActors::TActivationContext::AsActorContext().SelfID.NodeId());

ydb/core/tx/columnshard/engines/reader/sys_view/portions/portions.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayB
1010
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId().GetRawValue());
1111
const std::string prod = ::ToString(portion.GetMeta().Produced);
1212
NArrow::Append<arrow::StringType>(*builders[1], prod);
13-
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->TabletId);
13+
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->GetTabletId());
1414
NArrow::Append<arrow::UInt64Type>(*builders[3], portion.GetRecordsCount());
1515
NArrow::Append<arrow::UInt64Type>(*builders[4], portion.GetColumnRawBytes());
1616
NArrow::Append<arrow::UInt64Type>(*builders[5], portion.GetIndexRawBytes());

ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {
4444

4545
TScannerConstructorContext context(snapshot, 0, sorting);
4646
{
47-
TReadDescription read(snapshot, sorting);
47+
TReadDescription read(Self->TabletID(), snapshot, sorting);
4848
read.SetScanIdentifier(request.TaskIdentifier);
4949
read.PathId = request.GetPathId();
5050
read.LockId = LockId;

ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ void TTxScan::Complete(const TActorContext& ctx) {
6464
{
6565
LOG_S_DEBUG("TTxScan prepare txId: " << txId << " scanId: " << scanId << " at tablet " << Self->TabletID());
6666

67-
TReadDescription read(snapshot, sorting);
67+
TReadDescription read(Self->TabletID(), snapshot, sorting);
6868
read.TxId = txId;
6969
if (request.HasLockTxId()) {
7070
read.LockId = request.GetLockTxId();

0 commit comments

Comments
 (0)