Skip to content

Commit 7258b98

Browse files
dont use insert table totally (#10088)
1 parent 47748f5 commit 7258b98

File tree

88 files changed

+1749
-662
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+1749
-662
lines changed

ydb/core/formats/arrow/reader/merger.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,16 @@ void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std
154154
Y_ABORT_UNLESS(SortHeap.Size());
155155
Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
156156
if (!SortHeap.Current().IsDeleted()) {
157+
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_add", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
157158
if (builder) {
158159
builder->AddRecord(SortHeap.Current().GetKeyColumns());
159160
}
160161
if (resultScanData && resultPosition) {
161162
*resultScanData = SortHeap.Current().GetKeyColumns().GetSorting();
162163
*resultPosition = SortHeap.Current().GetKeyColumns().GetPosition();
163164
}
165+
} else {
166+
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
164167
}
165168
CheckSequenceInDebug(SortHeap.Current().GetKeyColumns());
166169
const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition();
@@ -169,6 +172,7 @@ void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std
169172
bool isFirst = true;
170173
while (SortHeap.Size() && (isFirst || SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) {
171174
if (!isFirst) {
175+
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip1", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
172176
auto& anotherIterator = SortHeap.Current();
173177
if (PossibleSameVersionFlag) {
174178
AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater)

ydb/core/formats/arrow/reader/position.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,8 @@ class TIntervalPositions {
451451
private:
452452
std::vector<TIntervalPosition> Positions;
453453
public:
454+
using const_iterator = std::vector<TIntervalPosition>::const_iterator;
455+
454456
bool IsEmpty() const {
455457
return Positions.empty();
456458
}
@@ -459,6 +461,16 @@ class TIntervalPositions {
459461
return Positions.begin();
460462
}
461463

464+
TString DebugString() const {
465+
TStringBuilder sb;
466+
sb << "[";
467+
for (auto&& p : Positions) {
468+
sb << p.DebugJson().GetStringRobust() << ";";
469+
}
470+
sb << "]";
471+
return sb;
472+
}
473+
462474
std::vector<TIntervalPosition>::const_iterator end() const {
463475
return Positions.end();
464476
}
@@ -662,6 +674,35 @@ class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly
662674
return SplitByBorders(batch, columnNames, it);
663675
}
664676

677+
class TIntervalPointsIterator {
678+
private:
679+
typename TIntervalPositions::const_iterator Current;
680+
typename TIntervalPositions::const_iterator End;
681+
682+
public:
683+
TIntervalPointsIterator(const TIntervalPositions& container)
684+
: Current(container.begin())
685+
, End(container.end()) {
686+
}
687+
688+
bool IsValid() const {
689+
return Current != End;
690+
}
691+
692+
void Next() {
693+
++Current;
694+
}
695+
696+
const auto& CurrentPosition() const {
697+
return Current->GetPosition();
698+
}
699+
};
700+
701+
static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBordersInIntervalPositions(
702+
const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, const TIntervalPositions& container) {
703+
TIntervalPointsIterator it(container);
704+
return SplitByBorders(batch, columnNames, it);
705+
}
665706
};
666707

667708
}

ydb/core/formats/arrow/serializer/native.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ TString TNativeSerializer::DoSerializePayload(const std::shared_ptr<arrow::Recor
9999
// Write prepared payload into the resultant string. No extra allocation will be made.
100100
TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &out, &metadata_length));
101101
Y_ABORT_UNLESS(out.GetPosition() == str.size());
102-
Y_DEBUG_ABORT_UNLESS(Deserialize(str, batch->schema()).ok());
102+
AFL_VERIFY_DEBUG(Deserialize(str, batch->schema()).ok());
103103
AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "serialize")("size", str.size())("columns", batch->schema()->num_fields());
104104
return str;
105105
}

ydb/core/formats/arrow/serializer/native.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ class TNativeSerializer: public ISerializer {
6262
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const override;
6363

6464
public:
65+
static std::shared_ptr<ISerializer> GetUncompressed() {
66+
static std::shared_ptr<ISerializer> result =
67+
std::make_shared<NArrow::NSerialization::TNativeSerializer>(arrow::Compression::UNCOMPRESSED);
68+
return result;
69+
}
70+
71+
static std::shared_ptr<ISerializer> GetFast() {
72+
static std::shared_ptr<ISerializer> result =
73+
std::make_shared<NArrow::NSerialization::TNativeSerializer>(arrow::Compression::LZ4_FRAME);
74+
return result;
75+
}
76+
6577
virtual TString GetClassName() const override {
6678
return GetClassNameStatic();
6779
}

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
9898
exchangerSettings->SetMaxDelayMs(10);
9999
AppConfig.MutableColumnShardConfig()->SetDisabledOnSchemeShard(false);
100100
FeatureFlags.SetEnableSparsedColumns(true);
101+
FeatureFlags.SetEnableImmediateWritingOnBulkUpsert(true);
102+
FeatureFlags.SetEnableWritePortionsOnInsert(true);
101103
FeatureFlags.SetEnableParameterizedDecimal(true);
102104
FeatureFlags.SetEnableTopicAutopartitioningForCDC(true);
103105
}

ydb/core/kqp/ut/olap/aggregations_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
9595
WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
9696
WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
9797
}
98-
while (csController->GetInsertFinishedCounter().Val() == 0) {
98+
while (csController->GetCompactionFinishedCounter().Val() == 0) {
9999
Cout << "Wait indexation..." << Endl;
100100
Sleep(TDuration::Seconds(2));
101101
}
@@ -374,7 +374,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
374374
.AddExpectedPlanOptions("KqpOlapFilter")
375375
#if SSA_RUNTIME_VERSION >= 2U
376376
.AddExpectedPlanOptions("TKqpOlapAgg")
377-
.MutableLimitChecker().SetExpectedResultCount(1)
377+
.MutableLimitChecker().SetExpectedResultCount(2)
378378
#else
379379
.AddExpectedPlanOptions("Condense")
380380
#endif
@@ -417,7 +417,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
417417
.AddExpectedPlanOptions("KqpOlapFilter")
418418
#if SSA_RUNTIME_VERSION >= 2U
419419
.AddExpectedPlanOptions("TKqpOlapAgg")
420-
.MutableLimitChecker().SetExpectedResultCount(1)
420+
.MutableLimitChecker().SetExpectedResultCount(2)
421421
#else
422422
.AddExpectedPlanOptions("CombineCore")
423423
.AddExpectedPlanOptions("KqpOlapFilter")

0 commit comments

Comments
 (0)