Skip to content

Commit 3c9ba45

Browse files
customization fetching by columns
1 parent 3a2be13 commit 3c9ba45

File tree

16 files changed

+367
-27
lines changed

16 files changed

+367
-27
lines changed

ydb/core/kqp/ut/olap/aggregations_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
109109
.SetWithSampleTables(false);
110110
TKikimrRunner kikimr(settings);
111111

112+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
112113
TLocalHelper(kikimr).CreateTestOlapTable();
113114
auto tableClient = kikimr.GetTableClient();
114-
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
115115

116116
{
117117
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);

ydb/core/tx/columnshard/blobs_reader/task.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class ITask: public NColumnShard::TMonitoringObjectsCounter<ITask> {
9292
const ui64 TaskIdentifier = 0;
9393
const TString ExternalTaskId;
9494
bool AbortFlag = false;
95-
TString TaskCustomer;
95+
YDB_READONLY_DEF(TString, TaskCustomer);
9696
std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
9797
i64 BlobsWaitingCount = 0;
9898
bool ResultsExtracted = false;

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -575,11 +575,6 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
575575
return;
576576
}
577577

578-
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
579-
sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
580-
return;
581-
}
582-
583578
std::optional<ui32> granuleShardingVersionId;
584579
if (record.HasGranuleShardingVersionId()) {
585580
granuleShardingVersionId = record.GetGranuleShardingVersionId();

ydb/core/tx/columnshard/engines/portions/data_accessor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ class TPortionDataAccessor {
284284
}
285285

286286
void SetExpectedRecordsCount(const ui32 expectedRowsCount) {
287-
AFL_VERIFY(!ExpectedRowsCount);
287+
AFL_VERIFY(!ExpectedRowsCount || ExpectedRowsCount == expectedRowsCount);
288288
ExpectedRowsCount = expectedRowsCount;
289289
if (!Data) {
290290
AFL_VERIFY(*ExpectedRowsCount == DefaultRowsCount);

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "constructor.h"
22

3+
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
34
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
45
#include <ydb/core/tx/conveyor/usage/service.h>
56

@@ -32,4 +33,25 @@ TBlobsFetcherTask::TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsRea
3233
, Guard(Context->GetCommonContext()->GetCounters().GetFetchBlobsGuard()) {
3334
}
3435

36+
void TColumnsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
37+
NBlobOperations::NRead::TCompositeReadBlobs blobsData = ExtractBlobsData();
38+
blobsData.Merge(std::move(ProvidedBlobs));
39+
TReadActionsCollection readActions;
40+
for (auto&& [_, i] : DataFetchers) {
41+
i->OnDataReceived(readActions, blobsData);
42+
}
43+
if (readActions.IsEmpty()) {
44+
for (auto&& i : DataFetchers) {
45+
Source->MutableStageData().AddFetcher(i.second);
46+
}
47+
AFL_VERIFY(Cursor.Next());
48+
auto task = std::make_shared<TStepAction>(Source, std::move(Cursor), Source->GetContext()->GetCommonContext()->GetScanActorId());
49+
NConveyor::TScanServiceOperator::SendTaskToExecute(task, Source->GetContext()->GetCommonContext()->GetConveyorProcessId());
50+
} else {
51+
std::shared_ptr<TColumnsFetcherTask> nextReadTask = std::make_shared<TColumnsFetcherTask>(
52+
std::move(readActions), DataFetchers, Source, std::move(Cursor), GetTaskCustomer(), GetExternalTaskId());
53+
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(nextReadTask));
54+
}
55+
}
56+
3557
} // namespace NKikimr::NOlap::NReader::NCommon

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,178 @@
1010

1111
namespace NKikimr::NOlap::NReader::NCommon {
1212

13+
class IKernelFetchLogic {
14+
private:
15+
YDB_READONLY(ui32, ColumnId, 0);
16+
17+
virtual void DoStart(const std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>& resources, TReadActionsCollection& nextRead) = 0;
18+
virtual void DoOnDataReceived(TReadActionsCollection& nextRead, NBlobOperations::NRead::TCompositeReadBlobs& blobs) = 0;
19+
virtual void DoOnDataCollected(const std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>& resources) = 0;
20+
21+
protected:
22+
const std::shared_ptr<IDataSource> Source;
23+
24+
public:
25+
using TFactory = NObjectFactory::TParametrizedObjectFactory<IKernelFetchLogic, TString, ui32, const std::shared_ptr<IDataSource>&>;
26+
27+
virtual ~IKernelFetchLogic() = default;
28+
29+
IKernelFetchLogic(const ui32 columnId, const std::shared_ptr<IDataSource>& source)
30+
: ColumnId(columnId)
31+
, Source(source) {
32+
}
33+
34+
void Start(const std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>& resources, TReadActionsCollection& nextRead) {
35+
DoStart(resources, nextRead);
36+
}
37+
void OnDataReceived(TReadActionsCollection& nextRead, NBlobOperations::NRead::TCompositeReadBlobs& blobs) {
38+
DoOnDataReceived(nextRead, blobs);
39+
}
40+
void OnDataCollected(const std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>& resources) {
41+
DoOnDataCollected(resources);
42+
}
43+
};
44+
45+
class TChunkRestoreInfo {
46+
private:
47+
std::optional<TBlobRange> BlobRange;
48+
std::optional<TPortionDataAccessor::TAssembleBlobInfo> Data;
49+
const ui32 RecordsCount;
50+
51+
public:
52+
TChunkRestoreInfo(const ui32 recordsCount, const TBlobRange& range)
53+
: BlobRange(range)
54+
, RecordsCount(recordsCount)
55+
{
56+
}
57+
58+
const std::optional<TBlobRange>& GetBlobRangeOptional() const {
59+
return BlobRange;
60+
}
61+
62+
TChunkRestoreInfo(const ui32 recordsCount, const TPortionDataAccessor::TAssembleBlobInfo& defaultData)
63+
: Data(defaultData)
64+
, RecordsCount(recordsCount)
65+
{
66+
}
67+
68+
TPortionDataAccessor::TAssembleBlobInfo ExtractDataVerified() {
69+
AFL_VERIFY(!!Data);
70+
Data->SetExpectedRecordsCount(RecordsCount);
71+
return std::move(*Data);
72+
}
73+
74+
void SetBlobData(const TString& data) {
75+
AFL_VERIFY(!Data);
76+
Data.emplace(data);
77+
}
78+
};
79+
80+
class TDefaultFetchLogic: public IKernelFetchLogic {
81+
private:
82+
using TBase = IKernelFetchLogic;
83+
static const inline auto Registrator = TFactory::TRegistrator<TDefaultFetchLogic>("default");
84+
85+
std::vector<TChunkRestoreInfo> ColumnChunks;
86+
std::optional<TString> StorageId;
87+
virtual void DoOnDataCollected(const std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>& resources) override {
88+
AFL_VERIFY(!IIndexInfo::IsSpecialColumn(GetColumnId()));
89+
std::vector<TPortionDataAccessor::TAssembleBlobInfo> chunks;
90+
for (auto&& i : ColumnChunks) {
91+
chunks.emplace_back(i.ExtractDataVerified());
92+
}
93+
94+
TPortionDataAccessor::TPreparedColumn column(std::move(chunks), Source->GetSourceSchema()->GetColumnLoaderVerified(GetColumnId()));
95+
resources->AddVerified(GetColumnId(), column.AssembleAccessor().DetachResult(), true);
96+
}
97+
98+
virtual void DoOnDataReceived(TReadActionsCollection& /*nextRead*/, NBlobOperations::NRead::TCompositeReadBlobs& blobs) override {
99+
if (ColumnChunks.empty()) {
100+
return;
101+
}
102+
for (auto&& i : ColumnChunks) {
103+
if (!i.GetBlobRangeOptional()) {
104+
continue;
105+
}
106+
AFL_VERIFY(!!StorageId);
107+
i.SetBlobData(blobs.Extract(*StorageId, *i.GetBlobRangeOptional()));
108+
}
109+
}
110+
111+
virtual void DoStart(const std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>& resources, TReadActionsCollection& nextRead) override {
112+
if (resources->HasColumn(GetColumnId())) {
113+
return;
114+
}
115+
auto columnChunks = Source->GetStageData().GetPortionAccessor().GetColumnChunksPointers(GetColumnId());
116+
if (columnChunks.empty()) {
117+
ColumnChunks.emplace_back(
118+
Source->GetRecordsCount(), TPortionDataAccessor::TAssembleBlobInfo(Source->GetRecordsCount(),
119+
Source->GetSourceSchema()->GetExternalDefaultValueVerified(GetColumnId())));
120+
return;
121+
}
122+
StorageId = Source->GetColumnStorageId(GetColumnId());
123+
TBlobsAction blobsAction(Source->GetContext()->GetCommonContext()->GetStoragesManager(), NBlobOperations::EConsumer::SCAN);
124+
auto reading = blobsAction.GetReading(*StorageId);
125+
auto filterPtr = Source->GetStageData().GetAppliedFilter();
126+
const NArrow::TColumnFilter& cFilter = filterPtr ? *filterPtr : NArrow::TColumnFilter::BuildAllowFilter();
127+
auto itFilter = cFilter.GetIterator(false, Source->GetRecordsCount());
128+
bool itFinished = false;
129+
for (auto&& c : columnChunks) {
130+
AFL_VERIFY(!itFinished);
131+
if (!itFilter.IsBatchForSkip(c->GetMeta().GetRecordsCount())) {
132+
reading->SetIsBackgroundProcess(false);
133+
reading->AddRange(Source->RestoreBlobRange(c->BlobRange));
134+
ColumnChunks.emplace_back(c->GetMeta().GetRecordsCount(), Source->RestoreBlobRange(c->BlobRange));
135+
} else {
136+
ColumnChunks.emplace_back(c->GetMeta().GetRecordsCount(), TPortionDataAccessor::TAssembleBlobInfo(
137+
c->GetMeta().GetRecordsCount(), Source->GetSourceSchema()->GetExternalDefaultValueVerified(c->GetColumnId())));
138+
}
139+
itFinished = !itFilter.Next(c->GetMeta().GetRecordsCount());
140+
}
141+
AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", Source->GetRecordsCount());
142+
for (auto&& i : blobsAction.GetReadingActions()) {
143+
nextRead.Add(i);
144+
}
145+
}
146+
147+
public:
148+
TDefaultFetchLogic(const ui32 columnId, const std::shared_ptr<IDataSource>& source)
149+
: TBase(columnId, source) {
150+
}
151+
};
152+
153+
class TColumnsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnShard::TMonitoringObjectsCounter<TColumnsFetcherTask> {
154+
private:
155+
using TBase = NBlobOperations::NRead::ITask;
156+
std::shared_ptr<IDataSource> Source;
157+
THashMap<ui32, std::shared_ptr<IKernelFetchLogic>> DataFetchers;
158+
TFetchingScriptCursor Cursor;
159+
NBlobOperations::NRead::TCompositeReadBlobs ProvidedBlobs;
160+
const NColumnShard::TCounterGuard Guard;
161+
virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
162+
virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override {
163+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())(
164+
"scan_actor_id", Source->GetContext()->GetCommonContext()->GetScanActorId())("status", status.GetErrorMessage())(
165+
"status_code", status.GetStatus())("storage_id", storageId);
166+
NActors::TActorContext::AsActorContext().Send(Source->GetContext()->GetCommonContext()->GetScanActorId(),
167+
std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(
168+
TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
169+
return false;
170+
}
171+
172+
public:
173+
TColumnsFetcherTask(TReadActionsCollection&& actions, const THashMap<ui32, std::shared_ptr<IKernelFetchLogic>>& fetchers,
174+
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& cursor, const TString& taskCustomer,
175+
const TString& externalTaskId = "")
176+
: TBase(actions, taskCustomer, externalTaskId)
177+
, Source(source)
178+
, DataFetchers(fetchers)
179+
, Cursor(cursor)
180+
, Guard(Source->GetContext()->GetCommonContext()->GetCounters().GetFetchBlobsGuard())
181+
{
182+
}
183+
};
184+
13185
class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnShard::TMonitoringObjectsCounter<TBlobsFetcherTask> {
14186
private:
15187
using TBase = NBlobOperations::NRead::ITask;
Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,35 @@
1+
#include "constructor.h"
12
#include "fetched_data.h"
23

34
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
45

5-
#include <ydb/library/formats/arrow/validation/validation.h>
66
#include <ydb/library/formats/arrow/simple_arrays_cache.h>
7+
#include <ydb/library/formats/arrow/validation/validation.h>
78

89
namespace NKikimr::NOlap::NReader::NCommon {
910

10-
void TFetchedData::SyncTableColumns(const std::vector<std::shared_ptr<arrow::Field>>& fields, const ISnapshotSchema& schema, const ui32 recordsCount) {
11+
void TFetchedData::SyncTableColumns(
12+
const std::vector<std::shared_ptr<arrow::Field>>& fields, const ISnapshotSchema& schema, const ui32 recordsCount) {
1113
for (auto&& i : fields) {
1214
const ui32 id = schema.GetColumnId(i->name());
1315
if (Table->HasColumn(id)) {
1416
continue;
1517
}
16-
Table->AddVerified(id, std::make_shared<NArrow::NAccessor::TTrivialArray>(NArrow::TThreadSimpleArraysCache::Get(
17-
i->type(), schema.GetExternalDefaultValueVerified(i->name()), recordsCount)), true);
18+
Table->AddVerified(id,
19+
std::make_shared<NArrow::NAccessor::TTrivialArray>(
20+
NArrow::TThreadSimpleArraysCache::Get(i->type(), schema.GetExternalDefaultValueVerified(i->name()), recordsCount)),
21+
true);
1822
}
1923
}
2024

25+
void TFetchedData::AddFetchers(const std::vector<std::shared_ptr<IKernelFetchLogic>>& fetchers) {
26+
for (auto&& i : fetchers) {
27+
AFL_VERIFY(Fetchers.emplace(i->GetColumnId(), i).second);
28+
}
29+
}
30+
31+
void TFetchedData::AddFetcher(const std::shared_ptr<IKernelFetchLogic>& fetcher) {
32+
AFL_VERIFY(Fetchers.emplace(fetcher->GetColumnId(), fetcher).second);
33+
}
34+
2135
} // namespace NKikimr::NOlap::NReader::NCommon

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919

2020
namespace NKikimr::NOlap::NReader::NCommon {
2121

22+
class IKernelFetchLogic;
23+
2224
class TFetchedData {
2325
private:
2426
using TBlobs = THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo>;
27+
using TFetchers = THashMap<ui32, std::shared_ptr<IKernelFetchLogic>>;
28+
TFetchers Fetchers;
2529
YDB_ACCESSOR_DEF(TBlobs, Blobs);
2630
YDB_READONLY_DEF(std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>, Table);
2731
YDB_READONLY(bool, Aborted, false);
@@ -30,6 +34,26 @@ class TFetchedData {
3034
std::optional<TPortionDataAccessor> PortionAccessor;
3135

3236
public:
37+
void AddFetchers(const std::vector<std::shared_ptr<IKernelFetchLogic>>& fetchers);
38+
void AddFetcher(const std::shared_ptr<IKernelFetchLogic>& fetcher);
39+
40+
std::shared_ptr<IKernelFetchLogic> ExtractFetcherOptional(const ui32 columnId) {
41+
auto it = Fetchers.find(columnId);
42+
if (it == Fetchers.end()) {
43+
return nullptr;
44+
} else {
45+
auto result = it->second;
46+
Fetchers.erase(it);
47+
return result;
48+
}
49+
}
50+
51+
std::shared_ptr<IKernelFetchLogic> ExtractFetcherVerified(const ui32 columnId) {
52+
auto result = ExtractFetcherOptional(columnId);
53+
AFL_VERIFY(!!result)("column_id", columnId);
54+
return result;
55+
}
56+
3357
void Abort() {
3458
Aborted = true;
3559
}

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
#include "constructor.h"
12
#include "fetch_steps.h"
23
#include "fetching.h"
34
#include "source.h"
45

6+
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
7+
58
#include <util/string/builder.h>
69
#include <yql/essentials/minikql/mkql_terminator.h>
710

@@ -163,7 +166,35 @@ bool TColumnsAccumulator::AddAssembleStep(
163166
return true;
164167
}
165168

166-
TConclusion<bool> TProgramStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
169+
TConclusion<bool> TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
170+
const ISnapshotSchema::TPtr ssSchema = source->GetSourceSchema();
171+
TReadActionsCollection readActions;
172+
THashMap<ui32, std::shared_ptr<IKernelFetchLogic>> fetchers;
173+
for (auto&& i : Step.GetOriginalColumnsToUse()) {
174+
if (Step->HasExecutionData(i, source->GetStageData().GetTable())) {
175+
continue;
176+
}
177+
const std::shared_ptr<TColumnLoader> loader = ssSchema->GetColumnLoaderVerified(i);
178+
auto logic = IKernelFetchLogic::TFactory::MakeHolder(Step->GetKernelClassNameDef("default"), i, source);
179+
AFL_VERIFY(!!logic);
180+
logic->Start(source->GetStageData().GetTable(), readActions);
181+
AFL_VERIFY(fetchers.emplace(i, logic.Release()).second)("column_id", i);
182+
}
183+
if (readActions.IsEmpty()) {
184+
NBlobOperations::NRead::TCompositeReadBlobs blobs;
185+
for (auto&& i : fetchers) {
186+
i.second->OnDataReceived(readActions, blobs);
187+
source->MutableStageData().AddFetcher(i.second);
188+
}
189+
AFL_VERIFY(readActions.IsEmpty());
190+
return true;
191+
}
192+
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
193+
std::make_shared<TColumnsFetcherTask>(std::move(readActions), fetchers, source, step, GetName(), "")));
194+
return false;
195+
}
196+
197+
TConclusion<bool> TProgramStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*cursor*/) const {
167198
auto result = Step->Execute(source->GetStageData().GetTable());
168199
if (result.IsFail()) {
169200
return result;
@@ -172,4 +203,15 @@ TConclusion<bool> TProgramStep::DoExecuteInplace(const std::shared_ptr<IDataSour
172203
return true;
173204
}
174205

206+
TConclusion<bool> TProgramStepAssemble::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*cursor*/) const {
207+
for (auto&& i : Step.GetOriginalColumnsToUse()) {
208+
if (Step->HasExecutionData(i, source->GetStageData().GetTable())) {
209+
continue;
210+
}
211+
auto fetcher = source->MutableStageData().ExtractFetcherVerified(i);
212+
fetcher->OnDataCollected(source->GetStageData().GetTable());
213+
}
214+
return true;
215+
}
216+
175217
} // namespace NKikimr::NOlap::NReader::NCommon

0 commit comments

Comments
 (0)