Skip to content

Commit 78f10f9

Browse files
authored
Reuse S3 locators to prevent false trash accumulation (#15390)
1 parent 7fc471f commit 78f10f9

File tree

6 files changed

+27
-14
lines changed

6 files changed

+27
-14
lines changed

ydb/core/blob_depot/agent/agent_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ namespace NKikimr::NBlobDepot {
194194
TActorId PipeId;
195195
TActorId PipeServerId;
196196
bool IsConnected = false;
197+
ui64 ConnectionInstance = 0;
197198

198199
NMonitoring::TDynamicCounterPtr AgentCounters;
199200

ydb/core/blob_depot/agent/comm.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ namespace NKikimr::NBlobDepot {
155155
}
156156

157157
void TBlobDepotAgent::OnDisconnect() {
158+
++ConnectionInstance;
159+
158160
while (!TabletRequestInFlight.empty()) {
159161
auto node = TabletRequestInFlight.extract(TabletRequestInFlight.begin());
160162
auto& requestInFlight = node.value();

ydb/core/blob_depot/agent/storage_put.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ namespace NKikimr::NBlobDepot {
1818
TBlobSeqId BlobSeqId;
1919
std::optional<TS3Locator> LocatorInFlight;
2020
TActorId WriterActorId;
21+
ui64 ConnectionInstanceOnStart;
2122

2223
struct TLifetimeToken {};
2324
std::shared_ptr<TLifetimeToken> LifetimeToken;
@@ -410,6 +411,8 @@ namespace NKikimr::NBlobDepot {
410411
.AddMetadata("key", Request.Id.ToString()),
411412
Request.Buffer.ExtractUnderlyingContainerOrCopy<TString>()),
412413
IEventHandle::FlagTrackDelivery));
414+
415+
ConnectionInstanceOnStart = Agent.ConnectionInstance;
413416
}
414417

415418
void OnPutS3ObjectResponse(std::optional<TString>&& error) {
@@ -418,6 +421,11 @@ namespace NKikimr::NBlobDepot {
418421

419422
WriterActorId = {};
420423

424+
if (ConnectionInstanceOnStart != Agent.ConnectionInstance) {
425+
error = "BlobDepot tablet disconnected";
426+
LocatorInFlight.reset(); // prevent discarding this locator
427+
}
428+
421429
if (error) {
422430
++*Agent.S3PutsError;
423431

ydb/core/blob_depot/op_commit_blob_seq.cpp

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,8 @@ namespace NKikimr::NBlobDepot {
7575
if (item.HasS3Locator()) {
7676
const auto& locator = TS3Locator::FromProto(item.GetS3Locator());
7777
const size_t numErased = agent.S3WritesInFlight.erase(locator);
78-
if (locator.Generation < generation) {
79-
Y_ABORT_UNLESS(!numErased);
80-
Self->Data->AddToS3Trash(locator, txc, this);
81-
} else {
82-
Y_ABORT_UNLESS(numErased == 1);
83-
Self->S3Manager->AddTrashToCollect(locator);
84-
}
78+
Y_ABORT_UNLESS(numErased);
79+
Self->S3Manager->AddTrashToCollect(locator);
8580
}
8681
};
8782

@@ -244,11 +239,10 @@ namespace NKikimr::NBlobDepot {
244239
}
245240

246241
for (const auto& item : record.GetS3Locators()) {
247-
if (const auto& locator = TS3Locator::FromProto(item); locator.Generation == generation) {
248-
const size_t numErased = agent.S3WritesInFlight.erase(locator);
249-
Y_ABORT_UNLESS(numErased == 1);
250-
S3Manager->AddTrashToCollect(locator);
251-
}
242+
const auto& locator = TS3Locator::FromProto(item);
243+
const size_t numErased = agent.S3WritesInFlight.erase(locator);
244+
Y_ABORT_UNLESS(numErased == 1);
245+
S3Manager->AddTrashToCollect(locator);
252246
}
253247
}
254248

ydb/core/blob_depot/s3_write.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ namespace NKikimr::NBlobDepot {
5353
locator.ToProto(responseItem->MutableS3Locator());
5454

5555
// we put it here until operation is completed; if tablet restarts and operation fails, then this
56-
// key will be deleted
56+
// key will be deleted; we also rewrite spoiled locator because Len is different
5757
db.Table<Schema::TrashS3>().Key(locator.Generation, locator.KeyId).Update<Schema::TrashS3::Len>(locator.Len);
5858

5959
const bool inserted = agent.S3WritesInFlight.insert(locator).second;
@@ -98,6 +98,14 @@ namespace NKikimr::NBlobDepot {
9898
};
9999

100100
TS3Locator TS3Manager::AllocateS3Locator(ui32 len) {
101+
if (!DeleteQueue.empty()) {
102+
TS3Locator res = DeleteQueue.front();
103+
DeleteQueue.pop_front();
104+
Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_OBJECTS] = --TotalS3TrashObjects;
105+
Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_SIZE] = TotalS3TrashSize -= res.Len;
106+
res.Len = len;
107+
return res;
108+
}
101109
return {
102110
.Len = len,
103111
.Generation = Self->Executor()->Generation(),

ydb/core/blob_depot/types.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ namespace NKikimr::NBlobDepot {
153153
}
154154

155155
TString MakeObjectName(const TString& basePath) const {
156-
const size_t hash = THash()(*this);
156+
const size_t hash = MultiHash(Generation, KeyId);
157157
const size_t a = hash % 36;
158158
const size_t b = hash / 36 % 36;
159159
static const char vec[] = "0123456789abcdefghijklmnopqrstuvwxyz";

0 commit comments

Comments
 (0)