Skip to content

Commit 83b9579

Browse files
dont use cpu for not abortable chunks (#7865)
1 parent a6a8b05 commit 83b9579

File tree

6 files changed

+27
-3
lines changed

6 files changed

+27
-3
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,11 @@ bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, const TWriteId writeId
271271
}
272272
LongTxWrites.erase(writeId);
273273
return true;
274+
} else {
275+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_remove_prepared_tx_insertion")("write_id", (ui64)writeId)("tx_id", txId);
274276
}
277+
} else {
278+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_remove_removed_tx_insertion")("write_id", (ui64)writeId)("tx_id", txId);
275279
}
276280
return false;
277281
}
@@ -284,9 +288,10 @@ void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTabl
284288
}
285289
}
286290
if (failedAborts.size()) {
287-
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
291+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
288292
}
289293
for (auto& writeId : failedAborts) {
294+
InsertTable->MarkAsNotAbortable(writeId);
290295
writesToAbort.erase(writeId);
291296
}
292297
if (!writesToAbort.empty()) {

ydb/core/tx/columnshard/engines/insert_table/data.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,13 @@ struct TInsertedData {
3333

3434
private:
3535
YDB_READONLY(ui64, SchemaVersion, 0);
36+
YDB_READONLY_FLAG(NotAbortable, false);
3637

3738
public:
39+
void MarkAsNotAbortable() {
40+
NotAbortableFlag = true;
41+
}
42+
3843
std::optional<TString> GetBlobData() const {
3944
if (BlobDataGuard) {
4045
return BlobDataGuard->GetData();
@@ -104,7 +109,7 @@ struct TInsertedData {
104109
/// One of them wins and becomes committed. Original DedupId would be lost then.
105110
/// After commit we use original Initiator:WriteId as DedupId of inserted blob inside {PlanStep, TxId}.
106111
/// pathId, initiator, {writeId}, {dedupId} -> pathId, planStep, txId, {dedupId}
107-
void Commit(ui64 planStep, ui64 txId) {
112+
void Commit(const ui64 planStep, const ui64 txId) {
108113
DedupId = ToString(PlanStep) + ":" + ToString((ui64)WriteTxId);
109114
PlanStep = planStep;
110115
WriteTxId = txId;

ydb/core/tx/columnshard/engines/insert_table/insert_table.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ class TInsertTable: public TInsertTableAccessor {
8282
TInsertionSummary::TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId,
8383
const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists);
8484
void Abort(IDbWrapper& dbTable, const THashSet<TWriteId>& writeIds);
85+
void MarkAsNotAbortable(const TWriteId writeId) {
86+
Summary.MarkAsNotAbortable(writeId);
87+
}
8588
THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const;
8689
THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId);
8790

ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetExpiredInsertions(const
120120
TInstant newMin = TInstant::Max();
121121
for (auto& [writeId, data] : Inserted) {
122122
const TInstant dataInsertTs = data.GetMeta().GetDirtyWriteTime();
123+
if (data.IsNotAbortable()) {
124+
continue;
125+
}
123126
if (dataInsertTs < timeBorder && toAbort.size() < limit) {
124127
toAbort.insert(writeId);
125128
} else {

ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ class TInsertionSummary {
3434
void OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept;
3535
static TAtomicCounter CriticalInserted;
3636
public:
37+
void MarkAsNotAbortable(const TWriteId writeId) {
38+
auto it = Inserted.find(writeId);
39+
if (it == Inserted.end()) {
40+
return;
41+
}
42+
it->second.MarkAsNotAbortable();
43+
}
44+
3745
THashSet<TWriteId> GetInsertedByPathId(const ui64 pathId) const;
3846

3947
THashSet<TWriteId> GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const;

ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi
117117
switch (mType) {
118118
case NEvWrite::EModificationType::Replace:
119119
case NEvWrite::EModificationType::Upsert: {
120-
AFL_VERIFY(batch->num_columns() <= dstSchema->num_fields());
120+
AFL_VERIFY(batch->num_columns() <= dstSchema->num_fields());
121121
if (batch->num_columns() < dstSchema->num_fields()) {
122122
for (auto&& f : dstSchema->fields()) {
123123
if (GetIndexInfo().IsNullableVerified(f->name())) {

0 commit comments

Comments
 (0)