Skip to content

Commit caf2ade

Browse files
many sub columns request usage correction (#15086)
1 parent 0db71ba commit caf2ade

File tree

5 files changed

+108
-22
lines changed

5 files changed

+108
-22
lines changed

ydb/core/formats/arrow/accessor/abstract/accessor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(const std::optional<TAdd
7575

7676
IChunkedArray::TFullChunkedArrayAddress IChunkedArray::GetArray(
7777
const std::optional<TAddressChain>& chunkCurrent, const ui64 position, const std::shared_ptr<IChunkedArray>& selfPtr) const {
78-
AFL_VERIFY(position < GetRecordsCount());
78+
AFL_VERIFY(position < GetRecordsCount())("pos", position)("records_count", GetRecordsCount());
7979
if (IsDataOwner()) {
8080
AFL_VERIFY(selfPtr);
8181
TAddressChain chain;

ydb/core/kqp/ut/olap/json_ut.cpp

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,24 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
102102
}
103103
};
104104

105+
class TOneActualizationCommand: public ICommand {
106+
private:
107+
virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) override {
108+
{
109+
auto alterQuery =
110+
TStringBuilder()
111+
<< "ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);";
112+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
113+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
114+
AFL_VERIFY(alterResult.GetStatus() == NYdb::EStatus::SUCCESS)("error", alterResult.GetIssues().ToString());
115+
}
116+
auto controller = NYDBTest::TControllers::GetControllerAs<NYDBTest::NColumnShard::TController>();
117+
AFL_VERIFY(controller);
118+
controller->WaitActualization(TDuration::Seconds(10));
119+
return TConclusionStatus::Success();
120+
}
121+
};
122+
105123
class TOneCompactionCommand: public ICommand {
106124
private:
107125
virtual TConclusionStatus DoExecute(TKikimrRunner& /*kikimr*/) override {
@@ -203,6 +221,8 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
203221
return std::make_shared<TStopCompactionCommand>();
204222
} else if (command.StartsWith("ONE_COMPACTION")) {
205223
return std::make_shared<TOneCompactionCommand>();
224+
} else if (command.StartsWith("ONE_ACTUALIZATION")) {
225+
return std::make_shared<TOneActualizationCommand>();
206226
} else {
207227
AFL_VERIFY(false)("command", command);
208228
return nullptr;
@@ -616,7 +636,55 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
616636
)";
617637
TScriptVariator(script).Execute();
618638
}
619-
639+
/*
640+
Y_UNIT_TEST(BloomIndexesVariants) {
641+
TString script = R"(
642+
STOP_COMPACTION
643+
------
644+
SCHEMA:
645+
CREATE TABLE `/Root/ColumnTable` (
646+
Col1 Uint64 NOT NULL,
647+
Col2 JsonDocument,
648+
PRIMARY KEY (Col1)
649+
)
650+
PARTITION BY HASH(Col1)
651+
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2$$);
652+
------
653+
SCHEMA:
654+
ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`)
655+
------
656+
SCHEMA:
657+
ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`,
658+
`COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`)
659+
------
660+
SCHEMA:
661+
ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`,
662+
`COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`)
663+
------
664+
DATA:
665+
REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "a1"}')), (2u, JsonDocument('{"a" : "a2"}')),
666+
(3u, JsonDocument('{"b" : "b3"}')), (4u, JsonDocument('{"b" : "b4", "a" : "a4"}'))
667+
------
668+
DATA:
669+
REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a" : "1a1"}')), (12u, JsonDocument('{"a" : "1a2"}')),
670+
(13u, JsonDocument('{"b" : "1b3"}')), (14u, JsonDocument('{"b" : "1b4", "a" : "a4"}'))
671+
------
672+
SCHEMA:
673+
ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=a_index, TYPE=BLOOM_FILTER,
674+
FEATURES=`{"column_names" : ["Col2"], "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : "a"}, "false_positive_probability" : 0.05}`)
675+
------
676+
DATA:
677+
REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(10u)
678+
------
679+
ONE_ACTUALIZATION
680+
------
681+
READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "1a1" ORDER BY Col1;
682+
EXPECTED: [[11u;["{\"a\":\"1a1\"}"]]]
683+
684+
)";
685+
TScriptVariator(script).Execute();
686+
}
687+
*/
620688
Y_UNIT_TEST(SwitchAccessorCompactionVariants) {
621689
TString script = R"(
622690
STOP_COMPACTION

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ TConclusion<bool> TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr<ID
181181
source->GetStageData().GetTable()->Remove(i.GetColumnId());
182182
}
183183
std::shared_ptr<IKernelFetchLogic> logic;
184-
if (customFetchInfo->GetFullRestore()) {
184+
if (customFetchInfo->GetFullRestore() || source->GetStageData().GetPortionAccessor().GetColumnChunksPointers(i.GetColumnId()).empty()) {
185185
logic = std::make_shared<TDefaultFetchLogic>(i.GetColumnId(), source);
186186
} else {
187187
AFL_VERIFY(customFetchInfo->GetSubColumns().size());

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ class TColumnChunkRestoreInfo {
7878
i.second.GetBlobDataVerified().size());
7979
std::vector<NArrow::NAccessor::TDeserializeChunkedArray::TChunk> chunks = { NArrow::NAccessor::TDeserializeChunkedArray::TChunk(
8080
GetRecordsCount(), i.second.GetBlobDataVerified()) };
81-
const std::shared_ptr<NArrow::NAccessor::IChunkedArray> arrOriginal = deserialize
82-
? columnLoader->ApplyVerified(i.second.GetBlobDataVerified(), GetRecordsCount())
81+
// const ui32 filledRecordsCount = PartialArray->GetHeader().GetColumnStats().GetColumnRecordsCount(i.second.GetColumnIdx());
82+
const std::shared_ptr<NArrow::NAccessor::IChunkedArray> arrOriginal =
83+
deserialize
84+
? columnLoader->ApplyVerified(i.second.GetBlobDataVerified(), GetRecordsCount()/*, filledRecordsCount*/)
8385
: std::make_shared<NArrow::NAccessor::TDeserializeChunkedArray>(GetRecordsCount(), columnLoader, std::move(chunks), true);
8486
if (applyFilter) {
8587
PartialArray->AddColumn(i.first, applyFilter->Apply(arrOriginal));
@@ -124,14 +126,12 @@ class TColumnChunkRestoreInfo {
124126
// "others", PartialArray->GetHeader().GetOtherStats().DebugJson().GetStringRobust());
125127
}
126128

127-
void InitPartialReader(
128-
const ui32 columnId, const ui32 positionStart, const std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>& resources) {
129+
void InitPartialReader(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& accessor) {
129130
AFL_VERIFY(!HeaderRange);
130131
AFL_VERIFY(!PartialArray);
131-
auto columnAccessor = resources->GetAccessorVerified(columnId);
132-
auto partialArray = columnAccessor->GetArraySlow(positionStart, columnAccessor);
133-
AFL_VERIFY(partialArray.GetArray()->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsPartialArray);
134-
PartialArray = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsPartialArray>(partialArray.GetArray());
132+
AFL_VERIFY(accessor);
133+
AFL_VERIFY(accessor->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsPartialArray)("type", accessor->GetType());
134+
PartialArray = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsPartialArray>(accessor);
135135
}
136136

137137
TColumnChunkRestoreInfo(const TBlobRange& fullChunkRange, const NArrow::NAccessor::TChunkConstructionData& chunkExternalInfo)
@@ -179,8 +179,10 @@ class TSubColumnsFetchLogic: public IKernelFetchLogic {
179179
}
180180
Resources->AddVerified(GetColumnId(), compositeBuilder.Finish(), true);
181181
} else {
182+
ui32 pos = 0;
182183
for (auto&& i : ColumnChunks) {
183-
i.Finish(Resources->GetAppliedFilter(), Source);
184+
i.Finish(std::make_shared<NArrow::TColumnFilter>(Resources->GetAppliedFilter()->Slice(pos, i.GetRecordsCount())), Source);
185+
pos += i.GetRecordsCount();
184186
}
185187
}
186188
}
@@ -238,23 +240,36 @@ class TSubColumnsFetchLogic: public IKernelFetchLogic {
238240
auto itFilter = cFilter.GetIterator(false, Source->GetRecordsCount());
239241
bool itFinished = false;
240242

241-
NeedToAddResource = !Resources->HasColumn(GetColumnId());
242-
ui32 posCurrent = 0;
243-
for (auto&& c : columnChunks) {
243+
auto accessor = Resources->GetAccessorOptional(GetColumnId());
244+
NeedToAddResource = !accessor;
245+
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> chunks;
246+
if (!NeedToAddResource) {
247+
if (accessor->GetType() == NArrow::NAccessor::IChunkedArray::EType::CompositeChunkedArray) {
248+
auto composite = std::static_pointer_cast<NArrow::NAccessor::TCompositeChunkedArray>(accessor);
249+
chunks = composite->GetChunks();
250+
} else {
251+
chunks.emplace_back(accessor);
252+
}
253+
}
254+
ui32 resChunkIdx = 0;
255+
for (ui32 chunkIdx = 0; chunkIdx < columnChunks.size(); ++chunkIdx) {
256+
auto& meta = columnChunks[chunkIdx]->GetMeta();
244257
AFL_VERIFY(!itFinished);
245-
if (!itFilter.IsBatchForSkip(c->GetMeta().GetRecordsCount())) {
246-
const TBlobRange range = Source->RestoreBlobRange(c->BlobRange);
247-
ColumnChunks.emplace_back(range, ChunkExternalInfo.GetSubset(c->GetMeta().GetRecordsCount()));
258+
if (!itFilter.IsBatchForSkip(meta.GetRecordsCount())) {
259+
const TBlobRange range = Source->RestoreBlobRange(columnChunks[chunkIdx]->BlobRange);
260+
ColumnChunks.emplace_back(range, ChunkExternalInfo.GetSubset(meta.GetRecordsCount()));
248261
if (!NeedToAddResource) {
249-
ColumnChunks.back().InitPartialReader(GetColumnId(), posCurrent, Resources);
262+
AFL_VERIFY(resChunkIdx < chunks.size())("chunks", chunks.size())("meta", columnChunks.size())("need", NeedToAddResource);
263+
ColumnChunks.back().InitPartialReader(chunks[resChunkIdx]);
264+
++resChunkIdx;
250265
}
251266
ColumnChunks.back().InitReading(reading, SubColumns);
252267
} else {
253-
ColumnChunks.emplace_back(TColumnChunkRestoreInfo::BuildEmpty(ChunkExternalInfo.GetSubset(c->GetMeta().GetRecordsCount())));
268+
ColumnChunks.emplace_back(TColumnChunkRestoreInfo::BuildEmpty(ChunkExternalInfo.GetSubset(meta.GetRecordsCount())));
254269
}
255-
itFinished = !itFilter.Next(c->GetMeta().GetRecordsCount());
256-
posCurrent += c->GetMeta().GetRecordsCount();
270+
itFinished = !itFilter.Next(meta.GetRecordsCount());
257271
}
272+
AFL_VERIFY(NeedToAddResource || (resChunkIdx == chunks.size()));
258273
AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", Source->GetRecordsCount());
259274
for (auto&& i : blobsAction.GetReadingActions()) {
260275
nextRead.Add(i);

ydb/core/tx/columnshard/engines/scheme/index_info.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,9 @@ NKikimr::TConclusionStatus TIndexInfo::AppendIndex(const THashMap<ui32, std::vec
433433
AFL_VERIFY(it != Indexes.end());
434434
auto& index = it->second;
435435
std::shared_ptr<IPortionDataChunk> chunk = index->BuildIndex(originalData, recordsCount, *this);
436+
if (!chunk) {
437+
return TConclusionStatus::Success();
438+
}
436439
auto opStorage = operators->GetOperatorVerified(index->GetStorageId());
437440
if ((i64)chunk->GetPackedSize() > opStorage->GetBlobSplitSettings().GetMaxBlobSize()) {
438441
return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ":" + ::ToString(chunk->GetPackedSize()) + ":" +

0 commit comments

Comments
 (0)