Skip to content

categorized indexes #15424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ void TCollector::DoAskData(
}
}

TDataCategorized TCollector::DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& /* consumer */) {
TDataCategorized TCollector::DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& /*consumer*/) {
TDataCategorized result;
for (auto&& p : portions) {
auto it = AccessorsCache.Find(p->GetPortionId());
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,17 @@ ui64 TPortionDataAccessor::GetIndexRawBytes(const bool validation /*= true*/) co
return sum;
}

std::vector<const TIndexChunk*> TPortionDataAccessor::GetIndexChunksPointers(const ui32 indexId) const {
std::vector<const TIndexChunk*> result;
for (auto&& c : GetIndexesVerified()) {
if (c.GetIndexId() == indexId) {
AFL_VERIFY(c.GetChunkIdx() == result.size());
result.emplace_back(&c);
}
}
return result;
}

std::vector<const TColumnRecord*> TPortionDataAccessor::GetColumnChunksPointers(const ui32 columnId) const {
std::vector<const TColumnRecord*> result;
for (auto&& c : GetRecordsVerified()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class TPortionDataAccessor {
NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo) const;

std::vector<const TColumnRecord*> GetColumnChunksPointers(const ui32 columnId) const;
std::vector<const TIndexChunk*> GetIndexChunksPointers(const ui32 indexId) const;

THashMap<TChunkAddress, TString> DecodeBlobAddresses(NBlobOperations::NRead::TCompositeReadBlobs&& blobs, const TIndexInfo& indexInfo) const;

Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/columnshard/engines/protos/index.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package NKikimrTxColumnShard;

message TIndexCategoriesDescription {
message TCategory {
repeated uint64 Hashes = 1;
optional uint32 FilterSize = 2;
}
repeated TCategory Categories = 1;
}
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/protos/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PROTO_LIBRARY()

SRCS(
portion_info.proto
index.proto
)

PEERDIR(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,69 @@
#include "fetching.h"
#include "source.h"

#include <ydb/core/formats/arrow/program/collection.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/blobs_reader/task.h>
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/collection.h>

namespace NKikimr::NOlap::NReader::NCommon {

class TFetchingResultContext {
private:
NArrow::NAccessor::TAccessorsCollection& Accessors;
NIndexes::TIndexesCollection& Indexes;
std::shared_ptr<IDataSource> Source;

public:
NArrow::NAccessor::TAccessorsCollection& GetAccessors() {
return Accessors;
}
NIndexes::TIndexesCollection& GetIndexes() const {
return Indexes;
}
const std::shared_ptr<IDataSource>& GetSource() const {
return Source;
}
TFetchingResultContext(
NArrow::NAccessor::TAccessorsCollection& accessors, NIndexes::TIndexesCollection& indexes, const std::shared_ptr<IDataSource>& source)
: Accessors(accessors)
, Indexes(indexes)
, Source(source)
{
}
};

class IKernelFetchLogic {
private:
YDB_READONLY(ui32, ColumnId, 0);

virtual void DoStart(TReadActionsCollection& nextRead) = 0;
virtual void DoStart(TReadActionsCollection& nextRead, TFetchingResultContext& context) = 0;
virtual void DoOnDataReceived(TReadActionsCollection& nextRead, NBlobOperations::NRead::TCompositeReadBlobs& blobs) = 0;
virtual void DoOnDataCollected() = 0;
virtual void DoOnDataCollected(TFetchingResultContext& context) = 0;

protected:
const std::shared_ptr<IDataSource> Source;
const std::shared_ptr<NArrow::NAccessor::TAccessorsCollection> Resources;
const std::shared_ptr<IStoragesManager> StoragesManager;

public:
virtual ~IKernelFetchLogic() = default;

IKernelFetchLogic(const ui32 columnId, const std::shared_ptr<IDataSource>& source)
IKernelFetchLogic(const ui32 columnId, const std::shared_ptr<IStoragesManager>& storagesManager)
: ColumnId(columnId)
, Source(source)
, Resources(Source->GetStageData().GetTable()) {
AFL_VERIFY(Resources);
, StoragesManager(storagesManager) {
AFL_VERIFY(StoragesManager);
}

void Start(TReadActionsCollection& nextRead) {
DoStart(nextRead);
void Start(TReadActionsCollection& nextRead, TFetchingResultContext& context) {
DoStart(nextRead, context);
}
void OnDataReceived(TReadActionsCollection& nextRead, NBlobOperations::NRead::TCompositeReadBlobs& blobs) {
DoOnDataReceived(nextRead, blobs);
}
void OnDataCollected() {
DoOnDataCollected();
void OnDataCollected(TFetchingResultContext& context) {
DoOnDataCollected(context);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ class TDefaultFetchLogic: public IKernelFetchLogic {

std::vector<TChunkRestoreInfo> ColumnChunks;
std::optional<TString> StorageId;
virtual void DoOnDataCollected() override {
virtual void DoOnDataCollected(TFetchingResultContext& context) override {
AFL_VERIFY(!IIndexInfo::IsSpecialColumn(GetColumnId()));
std::vector<TPortionDataAccessor::TAssembleBlobInfo> chunks;
for (auto&& i : ColumnChunks) {
chunks.emplace_back(i.ExtractDataVerified());
}

TPortionDataAccessor::TPreparedColumn column(std::move(chunks), Source->GetSourceSchema()->GetColumnLoaderVerified(GetColumnId()));
Resources->AddVerified(GetColumnId(), column.AssembleAccessor().DetachResult(), true);
TPortionDataAccessor::TPreparedColumn column(
std::move(chunks), context.GetSource()->GetSourceSchema()->GetColumnLoaderVerified(GetColumnId()));
context.GetAccessors().AddVerified(GetColumnId(), column.AssembleAccessor().DetachResult(), true);
}

virtual void DoOnDataReceived(TReadActionsCollection& /*nextRead*/, NBlobOperations::NRead::TCompositeReadBlobs& blobs) override {
Expand All @@ -66,42 +67,43 @@ class TDefaultFetchLogic: public IKernelFetchLogic {
}
}

virtual void DoStart(TReadActionsCollection& nextRead) override {
auto columnChunks = Source->GetStageData().GetPortionAccessor().GetColumnChunksPointers(GetColumnId());
virtual void DoStart(TReadActionsCollection& nextRead, TFetchingResultContext& context) override {
auto source = context.GetSource();
auto columnChunks = source->GetStageData().GetPortionAccessor().GetColumnChunksPointers(GetColumnId());
if (columnChunks.empty()) {
ColumnChunks.emplace_back(Source->GetRecordsCount(), TPortionDataAccessor::TAssembleBlobInfo(Source->GetRecordsCount(),
Source->GetSourceSchema()->GetExternalDefaultValueVerified(GetColumnId())));
ColumnChunks.emplace_back(source->GetRecordsCount(), TPortionDataAccessor::TAssembleBlobInfo(source->GetRecordsCount(),
source->GetSourceSchema()->GetExternalDefaultValueVerified(GetColumnId())));
return;
}
StorageId = Source->GetColumnStorageId(GetColumnId());
TBlobsAction blobsAction(Source->GetContext()->GetCommonContext()->GetStoragesManager(), NBlobOperations::EConsumer::SCAN);
StorageId = source->GetColumnStorageId(GetColumnId());
TBlobsAction blobsAction(source->GetContext()->GetCommonContext()->GetStoragesManager(), NBlobOperations::EConsumer::SCAN);
auto reading = blobsAction.GetReading(*StorageId);
auto filterPtr = Source->GetStageData().GetAppliedFilter();
auto filterPtr = source->GetStageData().GetAppliedFilter();
const NArrow::TColumnFilter& cFilter = filterPtr ? *filterPtr : NArrow::TColumnFilter::BuildAllowFilter();
auto itFilter = cFilter.GetIterator(false, Source->GetRecordsCount());
auto itFilter = cFilter.GetIterator(false, source->GetRecordsCount());
bool itFinished = false;
for (auto&& c : columnChunks) {
AFL_VERIFY(!itFinished);
if (!itFilter.IsBatchForSkip(c->GetMeta().GetRecordsCount())) {
reading->SetIsBackgroundProcess(false);
reading->AddRange(Source->RestoreBlobRange(c->BlobRange));
ColumnChunks.emplace_back(c->GetMeta().GetRecordsCount(), Source->RestoreBlobRange(c->BlobRange));
reading->AddRange(source->RestoreBlobRange(c->BlobRange));
ColumnChunks.emplace_back(c->GetMeta().GetRecordsCount(), source->RestoreBlobRange(c->BlobRange));
} else {
ColumnChunks.emplace_back(
c->GetMeta().GetRecordsCount(), TPortionDataAccessor::TAssembleBlobInfo(c->GetMeta().GetRecordsCount(),
Source->GetSourceSchema()->GetExternalDefaultValueVerified(c->GetColumnId())));
source->GetSourceSchema()->GetExternalDefaultValueVerified(c->GetColumnId())));
}
itFinished = !itFilter.Next(c->GetMeta().GetRecordsCount());
}
AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", Source->GetRecordsCount());
AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", source->GetRecordsCount());
for (auto&& i : blobsAction.GetReadingActions()) {
nextRead.Add(i);
}
}

public:
TDefaultFetchLogic(const ui32 columnId, const std::shared_ptr<IDataSource>& source)
: TBase(columnId, source) {
TDefaultFetchLogic(const ui32 columnId, const std::shared_ptr<IStoragesManager>& storagesManager)
: TBase(columnId, storagesManager) {
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/core/tx/columnshard/blobs_reader/task.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/collection.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>

#include <ydb/library/accessor/accessor.h>
Expand All @@ -28,6 +29,7 @@ class TFetchedData {
TFetchers Fetchers;
YDB_ACCESSOR_DEF(TBlobs, Blobs);
YDB_READONLY_DEF(std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>, Table);
YDB_READONLY_DEF(std::shared_ptr<NIndexes::TIndexesCollection>, Indexes);
YDB_READONLY(bool, Aborted, false);

std::shared_ptr<NGroupedMemoryManager::TAllocationGuard> AccessorsGuard;
Expand Down Expand Up @@ -69,6 +71,7 @@ class TFetchedData {
TFetchedData(const bool useFilter, const ui32 recordsCount) {
Table = std::make_shared<NArrow::NAccessor::TAccessorsCollection>(recordsCount);
Table->SetFilterUsage(useFilter);
Indexes = std::make_shared<NIndexes::TIndexesCollection>();
}

void SetAccessorsGuard(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard) {
Expand Down Expand Up @@ -118,7 +121,8 @@ class TFetchedData {
return result;
}

void AddBatch(const std::shared_ptr<NArrow::TGeneralContainer>& container, const NArrow::NSSA::IColumnResolver& resolver, const bool withFilter) {
void AddBatch(
const std::shared_ptr<NArrow::TGeneralContainer>& container, const NArrow::NSSA::IColumnResolver& resolver, const bool withFilter) {
Table->AddBatch(container, resolver, withFilter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ void TFetchingScriptBuilder::AddAssembleStep(
TConclusion<bool> TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
TReadActionsCollection readActions;
THashMap<ui32, std::shared_ptr<IKernelFetchLogic>> fetchers;
TFetchingResultContext context(*source->GetStageData().GetTable(), *source->GetStageData().GetIndexes(), source);
for (auto&& i : Step.GetOriginalColumnsToUse()) {
const auto columnLoader = source->GetSourceSchema()->GetColumnLoaderVerified(i.GetColumnId());
auto customFetchInfo = Step->BuildFetchTask(i.GetColumnId(), columnLoader->GetAccessorConstructor()->GetType(), source->GetStageData().GetTable());
Expand All @@ -205,12 +206,12 @@ TConclusion<bool> TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr<ID
}
std::shared_ptr<IKernelFetchLogic> logic;
if (customFetchInfo->GetFullRestore() || source->GetStageData().GetPortionAccessor().GetColumnChunksPointers(i.GetColumnId()).empty()) {
logic = std::make_shared<TDefaultFetchLogic>(i.GetColumnId(), source);
logic = std::make_shared<TDefaultFetchLogic>(i.GetColumnId(), source->GetContext()->GetCommonContext()->GetStoragesManager());
} else {
AFL_VERIFY(customFetchInfo->GetSubColumns().size());
logic = std::make_shared<TSubColumnsFetchLogic>(i.GetColumnId(), source, customFetchInfo->GetSubColumns());
}
logic->Start(readActions);
logic->Start(readActions, context);
AFL_VERIFY(fetchers.emplace(i.GetColumnId(), logic).second)("column_id", i.GetColumnId());
}
if (readActions.IsEmpty()) {
Expand Down Expand Up @@ -240,6 +241,7 @@ TConclusion<bool> TProgramStep::DoExecuteInplace(const std::shared_ptr<IDataSour

TConclusion<bool> TProgramStepAssemble::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*cursor*/) const {
TFetchingResultContext context(*source->GetStageData().GetTable(), *source->GetStageData().GetIndexes(), source);
for (auto&& i : Step.GetOriginalColumnsToUse()) {
const auto columnLoader = source->GetSourceSchema()->GetColumnLoaderVerified(i.GetColumnId());
auto customFetchInfo =
Expand All @@ -248,7 +250,7 @@ TConclusion<bool> TProgramStepAssemble::DoExecuteInplace(
continue;
}
auto fetcher = source->MutableStageData().ExtractFetcherVerified(i.GetColumnId());
fetcher->OnDataCollected();
fetcher->OnDataCollected(context);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,19 @@ class TSubColumnsFetchLogic: public IKernelFetchLogic {
std::vector<TColumnChunkRestoreInfo> ColumnChunks;
std::optional<TString> StorageId;
bool NeedToAddResource = false;
virtual void DoOnDataCollected() override {
virtual void DoOnDataCollected(TFetchingResultContext& context) override {
if (NeedToAddResource) {
NArrow::NAccessor::TCompositeChunkedArray::TBuilder compositeBuilder(ChunkExternalInfo.GetColumnType());
for (auto&& i : ColumnChunks) {
i.Finish(nullptr, Source);
i.Finish(nullptr, context.GetSource());
compositeBuilder.AddChunk(i.GetPartialArray());
}
Resources->AddVerified(GetColumnId(), compositeBuilder.Finish(), true);
context.GetAccessors().AddVerified(GetColumnId(), compositeBuilder.Finish(), true);
} else {
ui32 pos = 0;
for (auto&& i : ColumnChunks) {
i.Finish(std::make_shared<NArrow::TColumnFilter>(Resources->GetAppliedFilter()->Slice(pos, i.GetRecordsCount())), Source);
i.Finish(std::make_shared<NArrow::TColumnFilter>(context.GetAccessors().GetAppliedFilter()->Slice(pos, i.GetRecordsCount())),
context.GetSource());
pos += i.GetRecordsCount();
}
}
Expand All @@ -189,7 +190,7 @@ class TSubColumnsFetchLogic: public IKernelFetchLogic {
virtual void DoOnDataReceived(TReadActionsCollection& nextRead, NBlobOperations::NRead::TCompositeReadBlobs& blobs) override {
AFL_VERIFY(ColumnChunks.size());
AFL_VERIFY(!!StorageId);
TBlobsAction blobsAction(Source->GetContext()->GetCommonContext()->GetStoragesManager(), NBlobOperations::EConsumer::SCAN);
TBlobsAction blobsAction(StoragesManager, NBlobOperations::EConsumer::SCAN);
auto reading = blobsAction.GetReading(*StorageId);
reading->SetIsBackgroundProcess(false);
for (auto&& i : ColumnChunks) {
Expand Down Expand Up @@ -228,19 +229,20 @@ class TSubColumnsFetchLogic: public IKernelFetchLogic {
nextRead.Add(reading);
}

virtual void DoStart(TReadActionsCollection& nextRead) override {
auto columnChunks = Source->GetStageData().GetPortionAccessor().GetColumnChunksPointers(GetColumnId());
virtual void DoStart(TReadActionsCollection& nextRead, TFetchingResultContext& context) override {
auto source = context.GetSource();
auto columnChunks = source->GetStageData().GetPortionAccessor().GetColumnChunksPointers(GetColumnId());
AFL_VERIFY(columnChunks.size());
StorageId = Source->GetColumnStorageId(GetColumnId());
TBlobsAction blobsAction(Source->GetContext()->GetCommonContext()->GetStoragesManager(), NBlobOperations::EConsumer::SCAN);
StorageId = source->GetColumnStorageId(GetColumnId());
TBlobsAction blobsAction(StoragesManager, NBlobOperations::EConsumer::SCAN);
auto reading = blobsAction.GetReading(*StorageId);
reading->SetIsBackgroundProcess(false);
auto filterPtr = Source->GetStageData().GetAppliedFilter();
auto filterPtr = source->GetStageData().GetAppliedFilter();
const NArrow::TColumnFilter& cFilter = filterPtr ? *filterPtr : NArrow::TColumnFilter::BuildAllowFilter();
auto itFilter = cFilter.GetIterator(false, Source->GetRecordsCount());
auto itFilter = cFilter.GetIterator(false, source->GetRecordsCount());
bool itFinished = false;

auto accessor = Resources->GetAccessorOptional(GetColumnId());
auto accessor = context.GetAccessors().GetAccessorOptional(GetColumnId());
NeedToAddResource = !accessor;
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> chunks;
if (!NeedToAddResource) {
Expand All @@ -256,7 +258,7 @@ class TSubColumnsFetchLogic: public IKernelFetchLogic {
auto& meta = columnChunks[chunkIdx]->GetMeta();
AFL_VERIFY(!itFinished);
if (!itFilter.IsBatchForSkip(meta.GetRecordsCount())) {
const TBlobRange range = Source->RestoreBlobRange(columnChunks[chunkIdx]->BlobRange);
const TBlobRange range = source->RestoreBlobRange(columnChunks[chunkIdx]->BlobRange);
ColumnChunks.emplace_back(range, ChunkExternalInfo.GetSubset(meta.GetRecordsCount()));
if (!NeedToAddResource) {
AFL_VERIFY(resChunkIdx < chunks.size())("chunks", chunks.size())("meta", columnChunks.size())("need", NeedToAddResource);
Expand All @@ -270,18 +272,18 @@ class TSubColumnsFetchLogic: public IKernelFetchLogic {
itFinished = !itFilter.Next(meta.GetRecordsCount());
}
AFL_VERIFY(NeedToAddResource || (resChunkIdx == chunks.size()));
AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", Source->GetRecordsCount());
AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", source->GetRecordsCount());
for (auto&& i : blobsAction.GetReadingActions()) {
nextRead.Add(i);
}
}

public:
TSubColumnsFetchLogic(const ui32 columnId, const std::shared_ptr<IDataSource>& source, const std::vector<TString>& subColumns)
: TBase(columnId, source)
, ChunkExternalInfo(Source->GetSourceSchema()->GetColumnLoaderVerified(GetColumnId())->BuildAccessorContext(Source->GetRecordsCount()))
: TBase(columnId, source->GetContext()->GetCommonContext()->GetStoragesManager())
, ChunkExternalInfo(source->GetSourceSchema()->GetColumnLoaderVerified(GetColumnId())->BuildAccessorContext(source->GetRecordsCount()))
, SubColumns(subColumns) {
const auto loader = Source->GetSourceSchema()->GetColumnLoaderVerified(GetColumnId());
const auto loader = source->GetSourceSchema()->GetColumnLoaderVerified(GetColumnId());
AFL_VERIFY(loader->GetAccessorConstructor()->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray)
("type", loader->GetAccessorConstructor()->GetType());
}
Expand Down
Loading
Loading