File tree 3 files changed +29
-10
lines changed
tx/columnshard/engines/changes
3 files changed +29
-10
lines changed Original file line number Diff line number Diff line change @@ -204,10 +204,10 @@ std::shared_ptr<arrow::Scalar> TSparsedArrayChunk::GetScalar(const ui32 index) c
204
204
void TSparsedArray::TBuilder::AddChunk (const ui32 recordsCount, const std::shared_ptr<arrow::RecordBatch>& data) {
205
205
AFL_VERIFY (data);
206
206
AFL_VERIFY (recordsCount);
207
- AFL_VERIFY (data->num_rows () <= recordsCount);
208
- AFL_VERIFY (data->num_columns () == 2 );
209
- AFL_VERIFY (data->column (0 )->type_id () == arrow::uint32 ()->id ());
210
- AFL_VERIFY_DEBUG (data->schema ()->field (0 )->name () == " index" );
207
+ AFL_VERIFY (data->num_rows () <= recordsCount)( " rows " , data-> num_rows ())( " count " , recordsCount) ;
208
+ AFL_VERIFY (data->num_columns () == 2 )( " count " , data-> num_columns ()) ;
209
+ AFL_VERIFY (data->column (0 )->type_id () == arrow::uint32 ()->id ())( " type " , data-> column ( 0 )-> type ()-> ToString ()) ;
210
+ AFL_VERIFY_DEBUG (data->schema ()->field (0 )->name () == " index" )( " name " , data-> schema ()-> field ( 0 )-> name ()) ;
211
211
if (data->num_rows ()) {
212
212
auto * arr = static_cast <const arrow::UInt32Array*>(data->column (0 ).get ());
213
213
AFL_VERIFY (arr->Value (arr->length () - 1 ) < recordsCount)(" val" , arr->Value (arr->length () - 1 ))(" count" , recordsCount);
Original file line number Diff line number Diff line change @@ -67,17 +67,31 @@ TSparsedMerger::TWriter::TWriter(const TColumnMergeContext& context)
67
67
bool TSparsedMerger::TCursor::AddIndexTo (const ui32 index, TWriter& writer) {
68
68
if (index < NextGlobalPosition) {
69
69
return false ;
70
+ } else if (index == NextGlobalPosition) {
71
+ if (index == Chunk->GetFinishPosition ()) {
72
+ InitArrays (index );
73
+ if (index != NextGlobalPosition) {
74
+ return false ;
75
+ }
76
+ }
77
+ writer.AddRealData (Chunk->GetColValue (), NextLocalPosition);
78
+ if (++NextLocalPosition < Chunk->GetNotDefaultRecordsCount ()) {
79
+ NextGlobalPosition = CommonShift + Chunk->GetIndexUnsafeFast (NextLocalPosition);
80
+ return true ;
81
+ } else {
82
+ NextGlobalPosition = CommonShift + Chunk->GetRecordsCount ();
83
+ return false ;
84
+ }
70
85
}
71
86
AFL_VERIFY (Chunk->GetStartPosition () <= index );
72
87
if (Chunk->GetFinishPosition () <= index ) {
73
88
InitArrays (index );
74
89
}
75
90
bool found = false ;
76
- for (ui32 i = NextLocalPosition; i < Chunk->GetNotDefaultRecordsCount (); ++i) {
77
- NextLocalPosition = i;
78
- NextGlobalPosition = CommonShift + Chunk->GetIndexUnsafeFast (i);
91
+ for (; NextLocalPosition < Chunk->GetNotDefaultRecordsCount (); ++NextLocalPosition) {
92
+ NextGlobalPosition = CommonShift + Chunk->GetIndexUnsafeFast (NextLocalPosition);
79
93
if (NextGlobalPosition == index ) {
80
- writer.AddRealData (Chunk->GetColValue (), i );
94
+ writer.AddRealData (Chunk->GetColValue (), NextLocalPosition );
81
95
found = true ;
82
96
} else if (index < NextGlobalPosition) {
83
97
return found;
Original file line number Diff line number Diff line change @@ -160,6 +160,9 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
160
160
auto blobSchema = context.SchemaVersions .GetSchemaVerified (inserted.GetSchemaVersion ());
161
161
std::vector<ui32> filteredIds = inserted.GetMeta ().GetSchemaSubset ().Apply (blobSchema->GetIndexInfo ().GetColumnIds (true ));
162
162
usageColumnIds.insert (filteredIds.begin (), filteredIds.end ());
163
+ if (inserted.GetMeta ().GetModificationType () == NEvWrite::EModificationType::Delete) {
164
+ usageColumnIds.emplace ((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
165
+ }
163
166
if (usageColumnIds.size () == resultSchema->GetIndexInfo ().GetColumnIds (true ).size ()) {
164
167
break ;
165
168
}
@@ -179,8 +182,10 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
179
182
}
180
183
181
184
IIndexInfo::AddSnapshotColumns (*batch, inserted.GetSnapshot ());
182
- IIndexInfo::AddDeleteFlagsColumn (*batch, inserted.GetMeta ().GetModificationType () == NEvWrite::EModificationType::Delete);
183
- usageColumnIds.insert (IIndexInfo::GetSystemColumnIds ().begin (), IIndexInfo::GetSystemColumnIds ().end ());
185
+ if (usageColumnIds.contains ((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) {
186
+ IIndexInfo::AddDeleteFlagsColumn (*batch, inserted.GetMeta ().GetModificationType () == NEvWrite::EModificationType::Delete);
187
+ }
188
+ usageColumnIds.insert (IIndexInfo::GetSnapshotColumnIds ().begin (), IIndexInfo::GetSnapshotColumnIds ().end ());
184
189
185
190
batch = resultSchema->NormalizeBatch (*blobSchema, batch, usageColumnIds).DetachResult ();
186
191
pathBatches.Add (inserted, shardingFilterCommit, batch);
You can’t perform that action at this time.
0 commit comments