Skip to content

Commit f187f33

Browse files
zverevgenyivanmorozov333
authored andcommitted
Fix portions cleaning (#9347)
Co-authored-by: ivanmorozov333 <[email protected]>
1 parent d2b8ff8 commit f187f33

23 files changed

+203
-158
lines changed

ydb/core/tx/columnshard/columnshard__write.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
263263
<< Counters.GetWritesMonitor()->DebugString() << " at tablet " << TabletID());
264264
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
265265
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(
266-
TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), snapshotSchema, GetLastTxSnapshot());
266+
TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), snapshotSchema, GetLastTxSnapshot(), Counters.GetCSCounters().WritingCounters);
267267
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
268268
}
269269
}

ydb/core/tx/columnshard/counters/columnshard.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace NKikimr::NColumnShard {
88

99
TCSCounters::TCSCounters()
1010
: TBase("CS")
11+
, WritingCounters(std::make_shared<TWriteCounters>(*this))
1112
, Initialization(*this)
1213
, TxProgress(*this) {
1314
StartBackgroundCount = TBase::GetDeriviative("StartBackground/Count");

ydb/core/tx/columnshard/counters/columnshard.h

+25-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,29 @@ enum class EWriteFailReason {
2121
OverlimitReadBlobMemory /* "overlimit_read_blob_memory" */
2222
};
2323

24+
class TWriteCounters: public TCommonCountersOwner {
25+
private:
26+
using TBase = TCommonCountersOwner;
27+
NMonitoring::TDynamicCounters::TCounterPtr VolumeWriteData;
28+
NMonitoring::THistogramPtr HistogramBytesWriteDataCount;
29+
NMonitoring::THistogramPtr HistogramBytesWriteDataBytes;
30+
31+
public:
32+
TWriteCounters(TCommonCountersOwner& owner)
33+
: TBase(owner, "activity", "writing")
34+
{
35+
VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes");
36+
HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100));
37+
HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100));
38+
}
39+
40+
void OnIncomingData(const ui64 dataSize) const {
41+
VolumeWriteData->Add(dataSize);
42+
HistogramBytesWriteDataCount->Collect((i64)dataSize, 1);
43+
HistogramBytesWriteDataBytes->Collect((i64)dataSize, dataSize);
44+
}
45+
};
46+
2447
class TCSCounters: public TCommonCountersOwner {
2548
private:
2649
using TBase = TCommonCountersOwner;
@@ -72,7 +95,9 @@ class TCSCounters: public TCommonCountersOwner {
7295
NMonitoring::TDynamicCounters::TCounterPtr WriteRequests;
7396
THashMap<EWriteFailReason, NMonitoring::TDynamicCounters::TCounterPtr> FailedWriteRequests;
7497
NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;
98+
7599
public:
100+
const std::shared_ptr<TWriteCounters> WritingCounters;
76101
const TCSInitialization Initialization;
77102
TTxProgressCounters TxProgress;
78103

@@ -89,7 +114,6 @@ class TCSCounters: public TCommonCountersOwner {
89114

90115
void OnWritePutBlobsSuccess(const TDuration d) const {
91116
HistogramSuccessWritePutBlobsDurationMs->Collect(d.MilliSeconds());
92-
WritePutBlobsCount->Sub(1);
93117
}
94118

95119
void OnWriteMiddle1PutBlobsSuccess(const TDuration d) const {
@@ -118,11 +142,6 @@ class TCSCounters: public TCommonCountersOwner {
118142

119143
void OnWritePutBlobsFail(const TDuration d) const {
120144
HistogramFailedWritePutBlobsDurationMs->Collect(d.MilliSeconds());
121-
WritePutBlobsCount->Sub(1);
122-
}
123-
124-
void OnWritePutBlobsStart() const {
125-
WritePutBlobsCount->Add(1);
126145
}
127146

128147
void OnWriteTxComplete(const TDuration d) const {

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

+10-13
Original file line numberDiff line numberDiff line change
@@ -343,14 +343,16 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
343343
ui32 skipLocked = 0;
344344
ui32 portionsFromDrop = 0;
345345
bool limitExceeded = false;
346-
THashSet<TPortionAddress> uniquePortions;
347346
for (ui64 pathId : pathsToDrop) {
348347
auto g = GranulesStorage->GetGranuleOptional(pathId);
349348
if (!g) {
350349
continue;
351350
}
352351

353352
for (auto& [portion, info] : g->GetPortions()) {
353+
if (info->CheckForCleanup()) {
354+
continue;
355+
}
354356
if (dataLocksManager->IsLocked(*info)) {
355357
++skipLocked;
356358
continue;
@@ -361,8 +363,6 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
361363
limitExceeded = true;
362364
break;
363365
}
364-
const auto inserted = uniquePortions.emplace(info->GetAddress()).second;
365-
Y_ABORT_UNLESS(inserted);
366366
changes->PortionsToDrop.push_back(*info);
367367
++portionsFromDrop;
368368
}
@@ -381,17 +381,14 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
381381
++i;
382382
continue;
383383
}
384-
const auto inserted = uniquePortions.emplace(it->second[i].GetAddress()).second;
385-
if (inserted) {
386-
AFL_VERIFY(it->second[i].CheckForCleanup(snapshot))("p_snapshot", it->second[i].GetRemoveSnapshotOptional())("snapshot", snapshot);
387-
if (txSize + it->second[i].GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
388-
txSize += it->second[i].GetTxVolume();
389-
} else {
390-
limitExceeded = true;
391-
break;
392-
}
393-
changes->PortionsToDrop.push_back(std::move(it->second[i]));
384+
AFL_VERIFY(it->second[i].CheckForCleanup(snapshot))("p_snapshot", it->second[i].GetRemoveSnapshotOptional())("snapshot", snapshot);
385+
if (txSize + it->second[i].GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
386+
txSize += it->second[i].GetTxVolume();
387+
} else {
388+
limitExceeded = true;
389+
break;
394390
}
391+
changes->PortionsToDrop.push_back(std::move(it->second[i]));
395392
if (i + 1 < it->second.size()) {
396393
it->second[i] = std::move(it->second.back());
397394
}

ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<
123123
}
124124

125125
std::vector<TCommittedBlob> result;
126-
result.reserve(pInfo->GetCommitted().size() + pInfo->GetInserted().size());
126+
result.reserve(pInfo->GetCommitted().size() + Summary.GetInserted().size());
127127

128128
for (const auto& data : pInfo->GetCommitted()) {
129129
if (lockId || data.GetSnapshot() <= reqSnapshot) {
@@ -137,7 +137,10 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<
137137
}
138138
}
139139
if (lockId) {
140-
for (const auto& [writeId, data] : pInfo->GetInserted()) {
140+
for (const auto& [writeId, data] : Summary.GetInserted()) {
141+
if (data.GetPathId() != pathId) {
142+
continue;
143+
}
141144
auto start = data.GetMeta().GetFirstPK(pkSchema);
142145
auto finish = data.GetMeta().GetLastPK(pkSchema);
143146
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) {

ydb/core/tx/columnshard/engines/insert_table/insert_table.h

+1-5
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ namespace NKikimr::NOlap {
1212
class TPKRangesFilter;
1313
class IDbWrapper;
1414

15-
/// Use one table for inserted and committed blobs:
16-
/// !Commited => {PlanStep, WriteTxId} are {0, WriteId}
17-
/// Commited => {PlanStep, WriteTxId} are {PlanStep, TxId}
18-
1915
class TInsertTableAccessor {
2016
protected:
2117
TInsertionSummary Summary;
@@ -76,7 +72,7 @@ class TInsertTableAccessor {
7672
const THashMap<TInsertWriteId, TInsertedData>& GetAborted() const {
7773
return Summary.GetAborted();
7874
}
79-
const THashMap<TInsertWriteId, TInsertedData>& GetInserted() const {
75+
const TInsertedContainer& GetInserted() const {
8076
return Summary.GetInserted();
8177
}
8278
const TInsertionSummary::TCounters& GetCountersPrepared() const {

ydb/core/tx/columnshard/engines/insert_table/path_info.cpp

-4
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,4 @@ NKikimr::NOlap::TPathInfoIndexPriority TPathInfo::GetIndexationPriority() const
7272
}
7373
}
7474

75-
const THashMap<TInsertWriteId, TInsertedData>& TPathInfo::GetInserted() const {
76-
return Summary->GetInserted();
77-
}
78-
7975
}

ydb/core/tx/columnshard/engines/insert_table/path_info.h

-2
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ class TPathInfo: public TMoveOnly {
5757
return Committed.empty() && !InsertedSize;
5858
}
5959

60-
const THashMap<TInsertWriteId, TInsertedData>& GetInserted() const;
61-
6260
void AddInsertedSize(const i64 size, const ui64 overloadLimit);
6361

6462
explicit TPathInfo(TInsertionSummary& summary, const ui64 pathId);

ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp

+10-51
Original file line numberDiff line numberDiff line change
@@ -89,37 +89,8 @@ void TInsertionSummary::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize
8989
AFL_VERIFY(Counters.Inserted.GetDataSize() == (i64)StatsPrepared.Bytes);
9090
}
9191

92-
THashSet<TInsertWriteId> TInsertionSummary::GetInsertedByPathId(const ui64 pathId) const {
93-
THashSet<TInsertWriteId> result;
94-
for (auto& [writeId, data] : Inserted) {
95-
if (data.GetPathId() == pathId) {
96-
result.insert(writeId);
97-
}
98-
}
99-
100-
return result;
101-
}
102-
10392
THashSet<TInsertWriteId> TInsertionSummary::GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const {
104-
if (timeBorder < MinInsertedTs) {
105-
return {};
106-
}
107-
108-
THashSet<TInsertWriteId> toAbort;
109-
TInstant newMin = TInstant::Max();
110-
for (auto& [writeId, data] : Inserted) {
111-
const TInstant dataInsertTs = data.GetMeta().GetDirtyWriteTime();
112-
if (data.IsNotAbortable()) {
113-
continue;
114-
}
115-
if (dataInsertTs < timeBorder && toAbort.size() < limit) {
116-
toAbort.insert(writeId);
117-
} else {
118-
newMin = Min(newMin, dataInsertTs);
119-
}
120-
}
121-
MinInsertedTs = (toAbort.size() == Inserted.size()) ? TInstant::Zero() : newMin;
122-
return toAbort;
93+
return Inserted.GetExpired(timeBorder, limit);
12394
}
12495

12596
bool TInsertionSummary::EraseAborted(const TInsertWriteId writeId) {
@@ -173,33 +144,21 @@ const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData
173144
}
174145

175146
std::optional<NKikimr::NOlap::TInsertedData> TInsertionSummary::ExtractInserted(const TInsertWriteId id) {
176-
auto it = Inserted.find(id);
177-
if (it == Inserted.end()) {
178-
return {};
179-
} else {
180-
auto pathInfo = GetPathInfoOptional(it->second.GetPathId());
147+
auto result = Inserted.ExtractOptional(id);
148+
if (result) {
149+
auto pathInfo = GetPathInfoOptional(result->GetPathId());
181150
if (pathInfo) {
182-
OnEraseInserted(*pathInfo, it->second.BlobSize());
151+
OnEraseInserted(*pathInfo, result->BlobSize());
183152
}
184-
std::optional<TInsertedData> result = std::move(it->second);
185-
Inserted.erase(it);
186-
return result;
187153
}
154+
return result;
188155
}
189156

190157
const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedData&& data, const bool load /*= false*/) {
191-
const TInsertWriteId writeId = data.GetInsertWriteId();
192-
const ui32 dataSize = data.BlobSize();
193-
const ui64 pathId = data.GetPathId();
194-
auto insertInfo = Inserted.emplace(writeId, std::move(data));
195-
AFL_VERIFY_DEBUG(!Aborted.contains(writeId));
196-
if (insertInfo.second) {
197-
OnNewInserted(GetPathInfo(pathId), dataSize, load);
198-
return &insertInfo.first->second;
199-
} else {
200-
Counters.Inserted.SkipAdd(dataSize);
201-
return nullptr;
202-
}
158+
auto* insertInfo = Inserted.AddVerified(std::move(data));
159+
AFL_VERIFY_DEBUG(!Aborted.contains(insertInfo->GetInsertWriteId()));
160+
OnNewInserted(GetPathInfo(insertInfo->GetPathId()), insertInfo->BlobSize(), load);
161+
return insertInfo;
203162
}
204163

205164
}

0 commit comments

Comments
 (0)