Skip to content

cursor validations #17223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 134 additions & 131 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,8 @@ message TEvScanPing {
}

message TEvKqpScanCursor {
optional uint64 TabletId = 1;

message TColumnShardScanPlain {
}
message TColumnShardScanSimple {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/portions/meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TPortionMetaBase {
class TPortionMeta: public TPortionMetaBase {
private:
using TBase = TPortionMetaBase;
NArrow::TFirstLastSpecialKeys ReplaceKeyEdges; // first and last PK rows
NArrow::TFirstLastSpecialKeys ReplaceKeyEdges;
YDB_READONLY_DEF(TString, TierName);
YDB_READONLY(ui32, DeletionsCount, 0);
YDB_READONLY(ui32, CompactionLevel, 0);
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/engines/predicate/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class ICursorEntity {

class IScanCursor {
private:
YDB_ACCESSOR_DEF(std::optional<ui64>, TabletId);

virtual const std::shared_ptr<arrow::RecordBatch>& DoGetPKCursor() const = 0;
virtual bool DoCheckEntityIsBorder(const ICursorEntity& entity, bool& usage) const = 0;
virtual bool DoCheckSourceIntervalUsage(const ui64 sourceId, const ui32 indexStart, const ui32 recordsCount) const = 0;
Expand All @@ -146,11 +148,17 @@ class IScanCursor {
}

TConclusionStatus DeserializeFromProto(const NKikimrKqp::TEvKqpScanCursor& proto) {
if (proto.HasTabletId()) {
TabletId = proto.GetTabletId();
}
return DoDeserializeFromProto(proto);
}

NKikimrKqp::TEvKqpScanCursor SerializeToProto() const {
NKikimrKqp::TEvKqpScanCursor result;
if (TabletId) {
result.SetTabletId(*TabletId);
}
DoSerializeToProto(result);
return result;
}
Expand Down
20 changes: 14 additions & 6 deletions ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#pragma once
#include <ydb/core/tx/columnshard/common/path_id.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h>
#include <ydb/core/tx/columnshard/engines/reader/common/description.h>
#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h>
#include <ydb/core/tx/columnshard/common/path_id.h>

namespace NKikimr::NOlap {
class TPortionInfo;
Expand Down Expand Up @@ -46,6 +46,7 @@ class TReadMetadataBase {
TSnapshot RequestSnapshot;
std::optional<TGranuleShardingInfo> RequestShardingInfo;
std::shared_ptr<IScanCursor> ScanCursor;
const ui64 TabletId;
virtual void DoOnReadFinished(NColumnShard::TColumnShard& /*owner*/) const {
}
virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& /*owner*/) const {
Expand All @@ -61,7 +62,11 @@ class TReadMetadataBase {
public:
using TConstPtr = std::shared_ptr<const TReadMetadataBase>;

void SetRequestedLimit(const ui64 value) {
ui64 GetTabletId() const {
return TabletId;
}

void SetRequestedLimit(const ui64 value) {
AFL_VERIFY(!RequestedLimit);
if (value == 0 || value >= Max<i64>()) {
return;
Expand Down Expand Up @@ -172,20 +177,23 @@ class TReadMetadataBase {
}

TReadMetadataBase(const std::shared_ptr<TVersionedIndex> index, const ESorting sorting, const TProgramContainer& ssaProgram,
const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot, const std::shared_ptr<IScanCursor>& scanCursor)
const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot, const std::shared_ptr<IScanCursor>& scanCursor,
const ui64 tabletId)
: Sorting(sorting)
, Program(ssaProgram)
, IndexVersionsPointer(index)
, RequestSnapshot(requestSnapshot)
, ScanCursor(scanCursor)
, ResultIndexSchema(schema)
{
, TabletId(tabletId)
, ResultIndexSchema(schema) {
AFL_VERIFY(!ScanCursor || !ScanCursor->GetTabletId() || (*ScanCursor->GetTabletId() == TabletId))("cursor", ScanCursor->GetTabletId())(
"tablet_id", TabletId);
}
virtual ~TReadMetadataBase() = default;

virtual TString DebugString() const {
return TStringBuilder() << " predicate{" << (PKRangesFilter ? PKRangesFilter->DebugString() : "no_initialized") << "}"
<< " " << Sorting << " sorted";
<< " " << Sorting << " sorted";
}

std::set<ui32> GetProcessingColumnIds() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct TReadDescription {
std::shared_ptr<IScanCursor> ScanCursor;
YDB_ACCESSOR_DEF(TString, ScanIdentifier);
YDB_ACCESSOR(ERequestSorting, Sorting, ERequestSorting::NONE);
YDB_READONLY(ui64, TabletId, 0);

public:
// Table
Expand All @@ -47,9 +48,10 @@ struct TReadDescription {
ScanCursor = cursor;
}

TReadDescription(const TSnapshot& snapshot, const ERequestSorting sorting)
TReadDescription(const ui64 tabletId, const TSnapshot& snapshot, const ERequestSorting sorting)
: Snapshot(snapshot)
, Sorting(sorting)
, TabletId(tabletId)
, PKRangesFilter(std::make_shared<NOlap::TPKRangesFilter>()) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ TConclusionStatus TReadMetadata::Init(

TReadMetadata::TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read)
: TBase(schemaIndex, read.GetSorting(), read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(),
read.GetScanCursorOptional())
read.GetScanCursorOptional(), read.GetTabletId())
, PathId(read.PathId)
, ReadStats(std::make_shared<TReadStats>()) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ class ISourcesCollection {
return DoHasData();
}

std::shared_ptr<IScanCursor> BuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords) const {
std::shared_ptr<IScanCursor> BuildCursor(const std::shared_ptr<IDataSource>& source, const ui32 readyRecords, const ui64 tabletId) const {
AFL_VERIFY(source);
AFL_VERIFY(readyRecords <= source->GetRecordsCount())("count", source->GetRecordsCount())("ready", readyRecords);
return DoBuildCursor(source, readyRecords);
auto result = DoBuildCursor(source, readyRecords);
AFL_VERIFY(result);
result->SetTabletId(tabletId);
AFL_VERIFY(tabletId);
return result;
}

TString DebugString() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ ISyncPoint::ESourceAction TSyncPointResult::OnSourceReady(const std::shared_ptr<
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "has_result")("source_id", source->GetSourceId())(
"source_idx", source->GetSourceIdx())("table", resultChunk->GetTable()->num_rows())("is_finished", isFinished);
auto cursor = Collection->BuildCursor(source, resultChunk->GetStartIndex() + resultChunk->GetRecordsCount());
auto cursor = Collection->BuildCursor(source, resultChunk->GetStartIndex() + resultChunk->GetRecordsCount(),
Context->GetCommonContext()->GetReadMetadata()->GetTabletId());
reader.OnIntervalResult(std::make_shared<TPartialReadResult>(source->GetResourceGuards(), source->GetGroupGuard(),
resultChunk->GetTable(), cursor, Context->GetCommonContext(), partialSourceAddress));
} else if (!isFinished) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ struct TReadStatsMetadata: public TReadMetadataBase {
public:
using TConstPtr = std::shared_ptr<const TReadStatsMetadata>;

const ui64 TabletId;
std::vector<ui32> ReadColumnIds;
std::vector<ui32> ResultColumnIds;
std::deque<TGranuleMetaView> IndexGranules;

explicit TReadStatsMetadata(const std::shared_ptr<TVersionedIndex>& info, ui64 tabletId, const ESorting sorting,
const TProgramContainer& ssaProgram, const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
: TBase(info, sorting, ssaProgram, schema, requestSnapshot, nullptr)
, TabletId(tabletId) {
: TBase(info, sorting, ssaProgram, schema, requestSnapshot, nullptr, tabletId) {
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void TStatsIterator::AppendStats(
for (auto&& r : records) {
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId().GetRawValue());
NArrow::Append<arrow::StringType>(*builders[1], prodView);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->TabletId);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->GetTabletId());
NArrow::Append<arrow::UInt64Type>(*builders[3], r->GetMeta().GetRecordsCount());
NArrow::Append<arrow::UInt64Type>(*builders[4], r->GetMeta().GetRawBytes());
NArrow::Append<arrow::UInt64Type>(*builders[5], portion.GetPortionId());
Expand Down Expand Up @@ -94,7 +94,7 @@ void TStatsIterator::AppendStats(
for (auto&& r : indexes) {
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId().GetRawValue());
NArrow::Append<arrow::StringType>(*builders[1], prodView);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->TabletId);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->GetTabletId());
NArrow::Append<arrow::UInt64Type>(*builders[3], r->GetRecordsCount());
NArrow::Append<arrow::UInt64Type>(*builders[4], r->GetRawBytes());
NArrow::Append<arrow::UInt64Type>(*builders[5], portion.GetPortionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NKikimr::NOlap::NReader::NSysView::NGranules {

bool TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const {
NArrow::Append<arrow::UInt64Type>(*builders[0], granule.GetPathId().GetRawValue());
NArrow::Append<arrow::UInt64Type>(*builders[1], ReadMetadata->TabletId);
NArrow::Append<arrow::UInt64Type>(*builders[1], ReadMetadata->GetTabletId());
NArrow::Append<arrow::UInt64Type>(*builders[2], granule.GetPortions().size());
NArrow::Append<arrow::StringType>(*builders[3], HostNameField);
NArrow::Append<arrow::UInt64Type>(*builders[4], NActors::TActivationContext::AsActorContext().SelfID.NodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace NKikimr::NOlap::NReader::NSysView::NOptimizer {
bool TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const {
for (auto&& i : granule.GetOptimizerTasks()) {
NArrow::Append<arrow::UInt64Type>(*builders[0], granule.GetPathId().GetRawValue());
NArrow::Append<arrow::UInt64Type>(*builders[1], ReadMetadata->TabletId);
NArrow::Append<arrow::UInt64Type>(*builders[1], ReadMetadata->GetTabletId());
NArrow::Append<arrow::UInt64Type>(*builders[2], i.GetTaskId());
NArrow::Append<arrow::StringType>(*builders[3], HostNameField);
NArrow::Append<arrow::UInt64Type>(*builders[4], NActors::TActivationContext::AsActorContext().SelfID.NodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayB
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId().GetRawValue());
const std::string prod = ::ToString(portion.GetMeta().Produced);
NArrow::Append<arrow::StringType>(*builders[1], prod);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->TabletId);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->GetTabletId());
NArrow::Append<arrow::UInt64Type>(*builders[3], portion.GetRecordsCount());
NArrow::Append<arrow::UInt64Type>(*builders[4], portion.GetColumnRawBytes());
NArrow::Append<arrow::UInt64Type>(*builders[5], portion.GetIndexRawBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {

TScannerConstructorContext context(snapshot, 0, sorting);
{
TReadDescription read(snapshot, sorting);
TReadDescription read(Self->TabletID(), snapshot, sorting);
read.SetScanIdentifier(request.TaskIdentifier);
read.PathId = request.GetPathId();
read.LockId = LockId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void TTxScan::Complete(const TActorContext& ctx) {
{
LOG_S_DEBUG("TTxScan prepare txId: " << txId << " scanId: " << scanId << " at tablet " << Self->TabletID());

TReadDescription read(snapshot, sorting);
TReadDescription read(Self->TabletID(), snapshot, sorting);
read.TxId = txId;
if (request.HasLockTxId()) {
read.LockId = request.GetLockTxId();
Expand Down
Loading