Skip to content

reorganize fetching step construction #14867

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/counters/portion_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) {

{
auto findClass = TotalStats.find(portionClass);
AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId());
AFL_VERIFY(findClass != TotalStats.end())("path_id", portion.GetPathId());
findClass->second.RemovePortion(portion);
if (findClass->second.IsEmpty()) {
TotalStats.erase(findClass);
Expand All @@ -34,9 +34,9 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) {

{
auto findPathId = StatsByPathId.find(portion.GetPathId());
AFL_VERIFY(!findPathId.IsEnd())("path_id", portion.GetPathId());
AFL_VERIFY(findPathId != StatsByPathId.end())("path_id", portion.GetPathId());
auto findClass = findPathId->second.find(portionClass);
AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId());
AFL_VERIFY(findClass != findPathId->second.end())("path_id", portion.GetPathId());
findClass->second.RemovePortion(portion);
if (findClass->second.IsEmpty()) {
findPathId->second.erase(findClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ struct TReadDescription {
// List of columns
std::vector<ui32> ColumnIds;

const std::shared_ptr<IScanCursor>& GetScanCursor() const {
AFL_VERIFY(ScanCursor);
const std::shared_ptr<IScanCursor>& GetScanCursorOptional() const {
return ScanCursor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ TConclusionStatus TReadMetadata::Init(
return TConclusionStatus::Success();
}

TReadMetadata::TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read)
: TBase(schemaIndex, read.PKRangesFilter->IsReverse() ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC,
read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(), read.GetScanCursorOptional())
, PathId(read.PathId)
, ReadStats(std::make_shared<TReadStats>()) {
}

std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
const auto& ids = GetProgram().GetEarlyFilterColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,7 @@ class TReadMetadata: public TReadMetadataBase {
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
std::shared_ptr<TReadStats> ReadStats;

TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
, PathId(pathId)
, ReadStats(std::make_shared<TReadStats>()) {
}
TReadMetadata(const std::shared_ptr<TVersionedIndex>& schemaIndex, const TReadDescription& read);

virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {
return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ class TColumnsSetIds {
}
return result;
}

TColumnsSetIds& operator+=(const TColumnsSetIds& external) {
return (*this = *this + external);
}

TColumnsSetIds& operator-=(const TColumnsSetIds& external) {
return (*this = *this - external);
}

bool IsEmpty() const {
return ColumnIds.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ class TAllocateMemoryStep: public IFetchingStep {
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
virtual TString DoDebugString() const override {
return TStringBuilder() << "stage=" << StageIndex << ";";
std::vector<TString> columns;
for (const auto& pack : Packs) {
for (const ui32 columnId : pack.GetColumns().GetColumnIds()) {
columns.emplace_back(TStringBuilder() << pack.GetMemType() << ':' << columnId);
}
}
return TStringBuilder() << "stage=" << StageIndex << ";column_ids=[" << JoinSeq(',', columns) << "];";
}

public:
Expand Down Expand Up @@ -126,7 +132,7 @@ class TOptionalAssemblerStep: public IFetchingStep {
class TColumnBlobsFetchingStep: public IFetchingStep {
private:
using TBase = IFetchingStep;
TColumnsSetIds Columns;
YDB_READONLY_DEF(TColumnsSetIds, Columns);

protected:
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,24 @@ TConclusion<bool> TFetchingScriptCursor::Execute(const std::shared_ptr<IDataSour
}

TString TFetchingScript::DebugString() const {
TStringBuilder sb;
TStringBuilder sbBranch;
for (auto&& i : Steps) {
sbBranch << "{" << i->DebugString() << "};";
}
if (!sbBranch) {
return "";
}
sb << "{branch:" << BranchName << ";steps:[" << sbBranch << "]}";
return sb;
}

TString TFetchingScript::ProfileDebugString() const {
TStringBuilder sb;
TStringBuilder sbBranch;
for (auto&& i : Steps) {
if (i->GetSumDuration() > TDuration::MilliSeconds(10)) {
sbBranch << "{" << i->DebugString() << "};";
sbBranch << "{" << i->DebugString(true) << "};";
}
}
if (!sbBranch) {
Expand All @@ -92,12 +105,9 @@ TString TFetchingScript::DebugString() const {
return sb;
}

TFetchingScript::TFetchingScript(const TSpecialReadContext& /*context*/) {
}

void TFetchingScript::Allocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) {
void TFetchingScriptBuilder::AddAllocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) {
if (Steps.size() == 0) {
AddStep<TAllocateMemoryStep>(entityIds, mType, stage);
AddStep(std::make_shared<TAllocateMemoryStep>(entityIds, mType, stage));
} else {
std::optional<ui32> addIndex;
for (i32 i = Steps.size() - 1; i >= 0; --i) {
Expand All @@ -123,49 +133,62 @@ void TFetchingScript::Allocation(const std::set<ui32>& entityIds, const EStageFe
}
}

TString IFetchingStep::DebugString() const {
TString IFetchingStep::DebugString(const bool stats) const {
TStringBuilder sb;
sb << "name=" << Name << ";duration=" << GetSumDuration() << ";"
<< "size=" << 1e-9 * GetSumSize() << ";details={" << DoDebugString() << "};";
sb << "name=" << Name;
if (stats) {
sb << ";duration=" << GetSumDuration() << ";"
<< "size=" << 1e-9 * GetSumSize();
}
sb << ";details={" << DoDebugString() << "};";
return sb;
}

bool TColumnsAccumulator::AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) {
auto actualColumns = GetNotFetchedAlready(columns);
FetchingReadyColumns = FetchingReadyColumns + (TColumnsSetIds)columns;
if (!actualColumns.IsEmpty()) {
script.Allocation(columns.GetColumnIds(), stage, EMemType::Blob);
script.AddStep(std::make_shared<TColumnBlobsFetchingStep>(actualColumns));
return true;
TFetchingScriptBuilder::TFetchingScriptBuilder(const TSpecialReadContext& context)
: TFetchingScriptBuilder(context.GetReadMetadata()->GetResultSchema(), context.GetMergeColumns()) {
}

void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) {
auto actualColumns = columns - AddedFetchingColumns;
AddedFetchingColumns += columns;
if (actualColumns.IsEmpty()) {
return;
}
if (Steps.size() && std::dynamic_pointer_cast<TColumnBlobsFetchingStep>(Steps.back())) {
TColumnsSetIds fetchingColumns = actualColumns + std::dynamic_pointer_cast<TColumnBlobsFetchingStep>(Steps.back())->GetColumns();
Steps.pop_back();
AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Blob);
AddStep(std::make_shared<TColumnBlobsFetchingStep>(fetchingColumns));
} else {
AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Blob);
AddStep(std::make_shared<TColumnBlobsFetchingStep>(actualColumns));
}
return false;
}

bool TColumnsAccumulator::AddAssembleStep(
TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) {
auto actualColumns = columns - AssemblerReadyColumns;
AssemblerReadyColumns = AssemblerReadyColumns + columns;
void TFetchingScriptBuilder::AddAssembleStep(
const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) {
auto actualColumns = columns - AddedAssembleColumns;
AddedAssembleColumns += columns;
if (actualColumns.IsEmpty()) {
return false;
return;
}
auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
if (sequential) {
const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
if (notSequentialColumnIds.size()) {
script.Allocation(notSequentialColumnIds, stage, EMemType::Raw);
AddAllocation(notSequentialColumnIds, stage, EMemType::Raw);
std::shared_ptr<TColumnsSet> cross = actualSet->BuildSamePtr(notSequentialColumnIds);
script.AddStep<TAssemblerStep>(cross, purposeId);
AddStep(std::make_shared<TAssemblerStep>(cross, purposeId));
*actualSet = *actualSet - *cross;
}
if (!actualSet->IsEmpty()) {
script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential);
script.AddStep<TOptionalAssemblerStep>(actualSet, purposeId);
AddAllocation(notSequentialColumnIds, stage, EMemType::RawSequential);
AddStep(std::make_shared<TOptionalAssemblerStep>(actualSet, purposeId));
}
} else {
script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
script.AddStep<TAssemblerStep>(actualSet, purposeId);
AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
AddStep(std::make_shared<TAssemblerStep>(actualSet, purposeId));
}
return true;
}

TConclusion<bool> TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,21 @@ class IFetchingStep: public TNonCopyable {
, Signals(TFetchingStepsSignalsCollection::GetSignals(name)) {
}

TString DebugString() const;
TString DebugString(const bool stats = false) const;
};

class TFetchingScript {
private:
YDB_ACCESSOR(TString, BranchName, "UNDEFINED");
YDB_READONLY_DEF(TString, BranchName);
std::vector<std::shared_ptr<IFetchingStep>> Steps;
TAtomic StartInstant;
TAtomic FinishInstant;

public:
TFetchingScript(const TSpecialReadContext& context);

void Allocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType);
TFetchingScript(const TString& branchName, std::vector<std::shared_ptr<IFetchingStep>>&& steps)
: BranchName(branchName)
, Steps(std::move(steps)) {
}

void AddStepDataSize(const ui32 index, const ui64 size) {
GetStep(index)->AddDataSize(size);
Expand All @@ -139,32 +140,13 @@ class TFetchingScript {
}

TString DebugString() const;
TString ProfileDebugString() const;

const std::shared_ptr<IFetchingStep>& GetStep(const ui32 index) const {
AFL_VERIFY(index < Steps.size());
return Steps[index];
}

template <class T, typename... Args>
std::shared_ptr<T> AddStep(Args... args) {
auto result = std::make_shared<T>(args...);
Steps.emplace_back(result);
return result;
}

template <class T, typename... Args>
std::shared_ptr<T> InsertStep(const ui32 index, Args... args) {
AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size());
auto result = std::make_shared<T>(args...);
Steps.insert(Steps.begin() + index, result);
return result;
}

void AddStep(const std::shared_ptr<IFetchingStep>& step) {
AFL_VERIFY(step);
Steps.emplace_back(step);
}

bool IsFinished(const ui32 currentStepIdx) const {
AFL_VERIFY(currentStepIdx <= Steps.size());
return currentStepIdx == Steps.size();
Expand All @@ -189,9 +171,9 @@ class TFetchingScriptOwner: TNonCopyable {
return Script;
}

TString DebugString() const {
TString ProfileDebugString() const {
if (Script) {
return TStringBuilder() << Script->DebugString() << Endl;
return TStringBuilder() << Script->ProfileDebugString() << Endl;
} else {
return TStringBuilder() << "NO_SCRIPT" << Endl;
}
Expand Down Expand Up @@ -231,26 +213,50 @@ class TFetchingScriptOwner: TNonCopyable {
}
};

class TColumnsAccumulator {
class TFetchingScriptBuilder {
private:
TColumnsSetIds FetchingReadyColumns;
TColumnsSetIds AssemblerReadyColumns;
ISnapshotSchema::TPtr FullSchema;
std::shared_ptr<TColumnsSetIds> GuaranteeNotOptional;
ISnapshotSchema::TPtr FullSchema;

YDB_ACCESSOR(TString, BranchName, "UNDEFINED");
std::vector<std::shared_ptr<IFetchingStep>> Steps;
YDB_READONLY_DEF(TColumnsSetIds, AddedFetchingColumns);
YDB_READONLY_DEF(TColumnsSetIds, AddedAssembleColumns);

TFetchingScriptBuilder(const ISnapshotSchema::TPtr& schema, const std::shared_ptr<TColumnsSetIds>& guaranteeNotOptional)
: GuaranteeNotOptional(guaranteeNotOptional)
, FullSchema(schema) {
}

private:
void AddAllocation(const std::set<ui32>& entityIds, const EStageFeaturesIndexes stage, const EMemType mType);

template <class T, typename... Args>
std::shared_ptr<T> InsertStep(const ui32 index, Args... args) {
AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size());
auto result = std::make_shared<T>(args...);
Steps.insert(Steps.begin() + index, result);
return result;
}

public:
TColumnsAccumulator(const std::shared_ptr<TColumnsSetIds>& guaranteeNotOptional, const ISnapshotSchema::TPtr& fullSchema)
: FullSchema(fullSchema)
, GuaranteeNotOptional(guaranteeNotOptional) {
TFetchingScriptBuilder(const TSpecialReadContext& context);

std::shared_ptr<TFetchingScript> Build()&& {
return std::make_shared<TFetchingScript>(BranchName, std::move(Steps));
}

TColumnsSetIds GetNotFetchedAlready(const TColumnsSetIds& columns) const {
return columns - FetchingReadyColumns;
void AddStep(const std::shared_ptr<IFetchingStep>& step) {
AFL_VERIFY(step);
Steps.emplace_back(step);
}

bool AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage);
bool AddAssembleStep(TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage,
const bool sequential);
void AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage);
void AddAssembleStep(const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential);

static TFetchingScriptBuilder MakeForTests(ISnapshotSchema::TPtr schema, std::shared_ptr<TColumnsSetIds> guaranteeNotOptional = nullptr) {
return TFetchingScriptBuilder(schema, guaranteeNotOptional ? guaranteeNotOptional : std::make_shared<TColumnsSetIds>());
}
};

class TFetchingScriptCursor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ SRCS(

PEERDIR(
ydb/core/tx/columnshard/engines/scheme
yql/essentials/minikql
)

GENERATE_ENUM_SERIALIZATION(columns_set.h)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ NKikimr::TConclusion<std::shared_ptr<TReadMetadataBase>> TIndexScannerConstructo

TDataStorageAccessor dataAccessor(insertTable, index);
AFL_VERIFY(read.PathId);
auto readMetadata = std::make_shared<TReadMetadata>(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(),
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), nullptr);
auto readMetadata = std::make_shared<TReadMetadata>(index->CopyVersionedIndexPtr(), read);

auto initResult = readMetadata->Init(self, read, dataAccessor);
if (!initResult) {
Expand Down
Loading
Loading