Skip to content

Commit a2b0e30

Browse files
dont use cpu for not abortable chunks (#7865) (#7878)
Co-authored-by: ivanmorozov333 <[email protected]>
1 parent d9d39c9 commit a2b0e30

File tree

5 files changed

+26
-2
lines changed

5 files changed

+26
-2
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,11 @@ bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, const TWriteId writeId
270270
}
271271
LongTxWrites.erase(writeId);
272272
return true;
273+
} else {
274+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_remove_prepared_tx_insertion")("write_id", (ui64)writeId)("tx_id", txId);
273275
}
276+
} else {
277+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_remove_removed_tx_insertion")("write_id", (ui64)writeId)("tx_id", txId);
274278
}
275279
return false;
276280
}
@@ -283,9 +287,10 @@ void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTabl
283287
}
284288
}
285289
if (failedAborts.size()) {
286-
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
290+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
287291
}
288292
for (auto& writeId : failedAborts) {
293+
InsertTable->MarkAsNotAbortable(writeId);
289294
writesToAbort.erase(writeId);
290295
}
291296
if (!writesToAbort.empty()) {

ydb/core/tx/columnshard/engines/insert_table/data.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,13 @@ struct TInsertedData {
3333

3434
private:
3535
YDB_READONLY(ui64, SchemaVersion, 0);
36+
YDB_READONLY_FLAG(NotAbortable, false);
3637

3738
public:
39+
void MarkAsNotAbortable() {
40+
NotAbortableFlag = true;
41+
}
42+
3843
std::optional<TString> GetBlobData() const {
3944
if (BlobDataGuard) {
4045
return BlobDataGuard->GetData();
@@ -104,7 +109,7 @@ struct TInsertedData {
104109
/// One of them wins and becomes committed. Original DedupId would be lost then.
105110
/// After commit we use original Initiator:WriteId as DedupId of inserted blob inside {PlanStep, TxId}.
106111
/// pathId, initiator, {writeId}, {dedupId} -> pathId, planStep, txId, {dedupId}
107-
void Commit(ui64 planStep, ui64 txId) {
112+
void Commit(const ui64 planStep, const ui64 txId) {
108113
DedupId = ToString(PlanStep) + ":" + ToString((ui64)WriteTxId);
109114
PlanStep = planStep;
110115
WriteTxId = txId;

ydb/core/tx/columnshard/engines/insert_table/insert_table.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ class TInsertTable: public TInsertTableAccessor {
8282
TInsertionSummary::TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId,
8383
const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists);
8484
void Abort(IDbWrapper& dbTable, const THashSet<TWriteId>& writeIds);
85+
void MarkAsNotAbortable(const TWriteId writeId) {
86+
Summary.MarkAsNotAbortable(writeId);
87+
}
8588
THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const;
8689
THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId);
8790

ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetExpiredInsertions(const
120120
TInstant newMin = TInstant::Max();
121121
for (auto& [writeId, data] : Inserted) {
122122
const TInstant dataInsertTs = data.GetMeta().GetDirtyWriteTime();
123+
if (data.IsNotAbortable()) {
124+
continue;
125+
}
123126
if (dataInsertTs < timeBorder && toAbort.size() < limit) {
124127
toAbort.insert(writeId);
125128
} else {

ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ class TInsertionSummary {
3434
void OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize) noexcept;
3535
static TAtomicCounter CriticalInserted;
3636
public:
37+
void MarkAsNotAbortable(const TWriteId writeId) {
38+
auto it = Inserted.find(writeId);
39+
if (it == Inserted.end()) {
40+
return;
41+
}
42+
it->second.MarkAsNotAbortable();
43+
}
44+
3745
THashSet<TWriteId> GetInsertedByPathId(const ui64 pathId) const;
3846

3947
THashSet<TWriteId> GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const;

0 commit comments

Comments
 (0)