|
| 1 | +#include "merger.h" |
| 2 | + |
| 3 | +#include "column_cursor.h" |
| 4 | +#include "column_portion_chunk.h" |
| 5 | +#include "merge_context.h" |
| 6 | +#include "merged_column.h" |
| 7 | + |
| 8 | +#include <ydb/core/formats/arrow/reader/merger.h> |
| 9 | +#include <ydb/core/formats/arrow/simple_builder/array.h> |
| 10 | +#include <ydb/core/formats/arrow/simple_builder/filler.h> |
| 11 | +#include <ydb/core/tx/columnshard/splitter/batch_slice.h> |
| 12 | + |
| 13 | +namespace NKikimr::NOlap::NCompaction { |
| 14 | + |
| 15 | +std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared_ptr<TSerializationStats>& stats, |
| 16 | + const std::map<NArrow::NMerger::TSortableBatchPosition, bool>& checkPoints, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, |
| 17 | + const ui64 pathId, const std::optional<ui64> shardingActualVersion) { |
| 18 | + AFL_VERIFY(Batches.size() == Filters.size()); |
| 19 | + static const TString portionIdFieldName = "$$__portion_id"; |
| 20 | + static const TString portionRecordIndexFieldName = "$$__portion_record_idx"; |
| 21 | + static const std::shared_ptr<arrow::Field> portionIdField = |
| 22 | + std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>()); |
| 23 | + static const std::shared_ptr<arrow::Field> portionRecordIndexField = |
| 24 | + std::make_shared<arrow::Field>(portionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>()); |
| 25 | + |
| 26 | + std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults; |
| 27 | + { |
| 28 | + arrow::FieldVector indexFields; |
| 29 | + indexFields.emplace_back(portionIdField); |
| 30 | + indexFields.emplace_back(portionRecordIndexField); |
| 31 | + IIndexInfo::AddSpecialFields(indexFields); |
| 32 | + auto dataSchema = std::make_shared<arrow::Schema>(indexFields); |
| 33 | + NArrow::NMerger::TMergePartialStream mergeStream( |
| 34 | + resultFiltered->GetIndexInfo().GetReplaceKey(), dataSchema, false, IIndexInfo::GetSnapshotColumnNames()); |
| 35 | + |
| 36 | + ui32 idx = 0; |
| 37 | + for (auto&& batch : Batches) { |
| 38 | + { |
| 39 | + NArrow::NConstruction::IArrayBuilder::TPtr column = |
| 40 | + std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>( |
| 41 | + portionIdFieldName, idx); |
| 42 | + batch->AddField(portionIdField, column->BuildArray(batch->num_rows())).Validate(); |
| 43 | + } |
| 44 | + { |
| 45 | + NArrow::NConstruction::IArrayBuilder::TPtr column = |
| 46 | + std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt32Type>>>( |
| 47 | + portionRecordIndexFieldName); |
| 48 | + batch->AddField(portionRecordIndexField, column->BuildArray(batch->num_rows())).Validate(); |
| 49 | + } |
| 50 | + mergeStream.AddSource(batch, Filters[idx]); |
| 51 | + ++idx; |
| 52 | + } |
| 53 | + batchResults = mergeStream.DrainAllParts(checkPoints, indexFields); |
| 54 | + } |
| 55 | + |
| 56 | + std::vector<std::map<ui32, std::vector<NCompaction::TColumnPortionResult>>> chunkGroups; |
| 57 | + chunkGroups.resize(batchResults.size()); |
| 58 | + for (auto&& columnId : resultFiltered->GetColumnIds()) { |
| 59 | + NActors::TLogContextGuard logGuard( |
| 60 | + NActors::TLogContextBuilder::Build()("field_name", resultFiltered->GetIndexInfo().GetColumnName(columnId))); |
| 61 | + auto columnInfo = stats->GetColumnInfo(columnId); |
| 62 | + auto resultField = resultFiltered->GetIndexInfo().GetColumnFieldVerified(columnId); |
| 63 | + |
| 64 | + std::vector<NCompaction::TPortionColumnCursor> cursors; |
| 65 | + { |
| 66 | + ui32 idx = 0; |
| 67 | + for (auto&& p : Batches) { |
| 68 | + cursors.emplace_back(NCompaction::TPortionColumnCursor(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)), idx)); |
| 69 | + ++idx; |
| 70 | + } |
| 71 | + } |
| 72 | + |
| 73 | + ui32 batchesRecordsCount = 0; |
| 74 | + ui32 columnRecordsCount = 0; |
| 75 | + std::map<std::string, std::vector<NCompaction::TColumnPortionResult>> columnChunks; |
| 76 | + ui32 batchIdx = 0; |
| 77 | + for (auto&& batchResult : batchResults) { |
| 78 | + const ui32 portionRecordsCountLimit = |
| 79 | + batchResult->num_rows() / (batchResult->num_rows() / NSplitter::TSplitSettings().GetExpectedRecordsCountOnPage() + 1) + 1; |
| 80 | + NCompaction::TColumnMergeContext context(columnId, resultFiltered, portionRecordsCountLimit, |
| 81 | + NSplitter::TSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo); |
| 82 | + NCompaction::TMergedColumn mColumn(context); |
| 83 | + |
| 84 | + auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName); |
| 85 | + auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName); |
| 86 | + auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); |
| 87 | + auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); |
| 88 | + Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx); |
| 89 | + Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id); |
| 90 | + Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id); |
| 91 | + Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id); |
| 92 | + Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id); |
| 93 | + const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx); |
| 94 | + const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx); |
| 95 | + |
| 96 | + AFL_VERIFY(batchResult->num_rows() == pIdxArray.length()); |
| 97 | + std::optional<ui16> predPortionIdx; |
| 98 | + for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) { |
| 99 | + const ui16 portionIdx = pIdxArray.Value(idx); |
| 100 | + const ui32 portionRecordIdx = pRecordIdxArray.Value(idx); |
| 101 | + auto& cursor = cursors[portionIdx]; |
| 102 | + cursor.Next(portionRecordIdx, mColumn); |
| 103 | + if (predPortionIdx && portionIdx != *predPortionIdx) { |
| 104 | + cursors[*predPortionIdx].Fetch(mColumn); |
| 105 | + } |
| 106 | + if (idx + 1 == pIdxArray.length()) { |
| 107 | + cursor.Fetch(mColumn); |
| 108 | + } |
| 109 | + predPortionIdx = portionIdx; |
| 110 | + } |
| 111 | + chunkGroups[batchIdx][columnId] = mColumn.BuildResult(); |
| 112 | + batchesRecordsCount += batchResult->num_rows(); |
| 113 | + columnRecordsCount += mColumn.GetRecordsCount(); |
| 114 | + AFL_VERIFY(batchResult->num_rows() == mColumn.GetRecordsCount()); |
| 115 | + ++batchIdx; |
| 116 | + } |
| 117 | + AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("mCount", columnRecordsCount)("bCount", batchesRecordsCount); |
| 118 | + } |
| 119 | + ui32 batchIdx = 0; |
| 120 | + |
| 121 | + const auto groups = |
| 122 | + resultFiltered->GetIndexInfo().GetEntityGroupsByStorageId(IStoragesManager::DefaultStorageId, *SaverContext.GetStoragesManager()); |
| 123 | + std::vector<TWritePortionInfoWithBlobsResult> result; |
| 124 | + for (auto&& columnChunks : chunkGroups) { |
| 125 | + auto batchResult = batchResults[batchIdx]; |
| 126 | + ++batchIdx; |
| 127 | + Y_ABORT_UNLESS(columnChunks.size()); |
| 128 | + |
| 129 | + for (auto&& i : columnChunks) { |
| 130 | + if (i.second.size() != columnChunks.begin()->second.size()) { |
| 131 | + for (ui32 p = 0; p < std::min<ui32>(columnChunks.begin()->second.size(), i.second.size()); ++p) { |
| 132 | + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())( |
| 133 | + "p", i.second[p].DebugString()); |
| 134 | + } |
| 135 | + } |
| 136 | + AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())( |
| 137 | + "current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first); |
| 138 | + } |
| 139 | + |
| 140 | + std::vector<TGeneralSerializedSlice> batchSlices; |
| 141 | + std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats)); |
| 142 | + |
| 143 | + for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) { |
| 144 | + THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> portionColumns; |
| 145 | + for (auto&& p : columnChunks) { |
| 146 | + portionColumns.emplace(p.first, p.second[i].GetChunks()); |
| 147 | + } |
| 148 | + batchSlices.emplace_back(portionColumns, schemaDetails, Context.Counters.SplitterCounters); |
| 149 | + } |
| 150 | + TSimilarPacker slicer(NSplitter::TSplitSettings().GetExpectedPortionSize()); |
| 151 | + auto packs = slicer.Split(batchSlices); |
| 152 | + |
| 153 | + ui32 recordIdx = 0; |
| 154 | + for (auto&& i : packs) { |
| 155 | + TGeneralSerializedSlice slicePrimary(std::move(i)); |
| 156 | + auto dataWithSecondary = resultFiltered->GetIndexInfo() |
| 157 | + .AppendIndexes(slicePrimary.GetPortionChunksToHash(), SaverContext.GetStoragesManager()) |
| 158 | + .DetachResult(); |
| 159 | + TGeneralSerializedSlice slice(dataWithSecondary.GetExternalData(), schemaDetails, Context.Counters.SplitterCounters); |
| 160 | + |
| 161 | + auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); |
| 162 | + const ui32 deletionsCount = IIndexInfo::CalcDeletions(b, true); |
| 163 | + auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), |
| 164 | + dataWithSecondary.GetSecondaryInplaceData(), pathId, resultFiltered->GetVersion(), resultFiltered->GetSnapshot(), |
| 165 | + SaverContext.GetStoragesManager()); |
| 166 | + |
| 167 | + NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultFiltered->GetIndexInfo().GetReplaceKey())); |
| 168 | + NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); |
| 169 | + constructor.GetPortionConstructor().AddMetadata(*resultFiltered, deletionsCount, primaryKeys, snapshotKeys); |
| 170 | + constructor.GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId); |
| 171 | + if (shardingActualVersion) { |
| 172 | + constructor.GetPortionConstructor().SetShardingVersion(*shardingActualVersion); |
| 173 | + } |
| 174 | + result.emplace_back(std::move(constructor)); |
| 175 | + recordIdx += slice.GetRecordsCount(); |
| 176 | + } |
| 177 | + } |
| 178 | + return result; |
| 179 | +} |
| 180 | + |
| 181 | +} |
0 commit comments