Skip to content

Commit 598e223

Browse files
alexvrublinkov
authored andcommitted
Add S3 support into BlobDepot (#14979)
1 parent c6ce967 commit 598e223

39 files changed

+1884
-269
lines changed

ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import ydb.core.protos.blob_depot_config_pb2 as blob_depot_config
33
import sys
44
import time
5+
from argparse import FileType
6+
import google.protobuf.json_format as pb_json
57

68
description = 'Create virtual group backed by BlobDepot'
79

@@ -17,11 +19,18 @@ def add_options(p):
1719
p.add_argument('--log-channel-sp', type=str, metavar='POOL_NAME', required=True, help='channel 0 specifier')
1820
p.add_argument('--snapshot-channel-sp', type=str, metavar='POOL_NAME', help='channel 1 specifier (defaults to channel 0)')
1921
p.add_argument('--data-channel-sp', type=str, metavar='POOL_NAME[*COUNT]', nargs='+', required=True, help='data channel specifier')
22+
p.add_argument('--s3-settings', type=FileType('r', encoding='utf-8'), metavar='JSON_FILE', help='path to JSON file containing S3 settings')
2023
p.add_argument('--wait', action='store_true', help='wait for operation to complete by polling')
2124

2225

2326
def do(args):
2427
request = common.create_bsc_request(args)
28+
29+
if args.s3_settings:
30+
s3_settings = args.s3_settings.read()
31+
else:
32+
s3_settings = None
33+
2534
for name in args.name:
2635
cmd = request.Command.add().AllocateVirtualGroup
2736

@@ -41,6 +50,9 @@ def do(args):
4150
print(f'Invalid --storage-pool-id={args.storage_pool_id} format, <number>:<number> expected', file=sys.stderr)
4251
sys.exit(1)
4352

53+
if s3_settings is not None:
54+
pb_json.Parse(s3_settings, cmd.S3BackendSettings)
55+
4456
cmd.ChannelProfiles.add(StoragePoolName=args.log_channel_sp, ChannelKind=blob_depot_config.TChannelKind.System)
4557
chan1 = args.snapshot_channel_sp if args.snapshot_channel_sp is not None else args.log_channel_sp
4658
cmd.ChannelProfiles.add(StoragePoolName=chan1, ChannelKind=blob_depot_config.TChannelKind.System)

ydb/core/blob_depot/agent.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "blob_depot_tablet.h"
22
#include "data.h"
33
#include "space_monitor.h"
4+
#include "s3.h"
45

56
namespace NKikimr::NBlobDepot {
67

@@ -30,6 +31,12 @@ namespace NKikimr::NBlobDepot {
3031
void TBlobDepot::OnAgentDisconnect(TAgent& agent) {
3132
agent.InvalidateStepRequests.clear();
3233
agent.PushCallbacks.clear();
34+
35+
for (TS3Locator locator : agent.S3WritesInFlight) {
36+
// they were not in InFlightTrashS3, so we just have to delete them
37+
S3Manager->AddTrashToCollect(locator);
38+
}
39+
agent.S3WritesInFlight.clear();
3340
}
3441

3542
void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) {
@@ -79,6 +86,14 @@ namespace NKikimr::NBlobDepot {
7986
record->SetDecommitGroupId(Config.GetVirtualGroupId());
8087
}
8188

89+
if (Config.HasS3BackendSettings()) {
90+
record->MutableS3BackendSettings()->CopyFrom(Config.GetS3BackendSettings());
91+
}
92+
93+
if (Config.HasName()) {
94+
record->SetName(Config.GetName());
95+
}
96+
8297
TActivationContext::Send(response.release());
8398

8499
if (!agent.InvalidatedStepInFlight.empty()) {

ydb/core/blob_depot/agent/agent_impl.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
#include <ydb/core/protos/blob_depot_config.pb.h>
77

8+
#include <ydb/core/wrappers/abstract.h>
9+
810
namespace NKikimr::NBlobDepot {
911

1012
#define ENUMERATE_INCOMING_EVENTS(XX) \
@@ -129,6 +131,7 @@ namespace NKikimr::NBlobDepot {
129131
TEvBlobDepot::TEvCollectGarbageResult*,
130132
TEvBlobDepot::TEvCommitBlobSeqResult*,
131133
TEvBlobDepot::TEvResolveResult*,
134+
TEvBlobDepot::TEvPrepareWriteS3Result*,
132135

133136
// underlying DS proxy responses
134137
TEvBlobStorage::TEvGetResult*,
@@ -233,6 +236,7 @@ namespace NKikimr::NBlobDepot {
233236
hFunc(TEvBlobDepot::TEvCollectGarbageResult, HandleTabletResponse);
234237
hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse);
235238
hFunc(TEvBlobDepot::TEvResolveResult, HandleTabletResponse);
239+
hFunc(TEvBlobDepot::TEvPrepareWriteS3Result, HandleTabletResponse);
236240

237241
hFunc(TEvBlobStorage::TEvGetResult, HandleOtherResponse);
238242
hFunc(TEvBlobStorage::TEvPutResult, HandleOtherResponse);
@@ -255,6 +259,9 @@ namespace NKikimr::NBlobDepot {
255259
void PassAway() override {
256260
ClearPendingEventQueue("BlobDepot agent destroyed");
257261
NTabletPipe::CloseAndForgetClient(SelfId(), PipeId);
262+
if (S3WrapperId) {
263+
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, S3WrapperId, SelfId(), nullptr, 0));
264+
}
258265
TActor::PassAway();
259266
}
260267

@@ -318,6 +325,11 @@ namespace NKikimr::NBlobDepot {
318325
NKikimrBlobStorage::TPDiskSpaceColor::E SpaceColor = {};
319326
float ApproximateFreeSpaceShare = 0.0f;
320327

328+
NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
329+
std::optional<NKikimrBlobDepot::TS3BackendSettings> S3BackendSettings;
330+
TActorId S3WrapperId;
331+
TString S3BasePath;
332+
321333
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
322334
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
323335
void ConnectToBlobDepot();
@@ -447,6 +459,8 @@ namespace NKikimr::NBlobDepot {
447459
void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
448460
void HandleQueryWatchdog();
449461

462+
void Invoke(std::function<void()> callback) { callback(); }
463+
450464
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
451465

452466
struct TChannelKind

ydb/core/blob_depot/agent/comm.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "agent_impl.h"
22
#include "blocks.h"
33

4+
#include <ydb/core/wrappers/s3_wrapper.h>
5+
46
namespace NKikimr::NBlobDepot {
57

68
void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
@@ -86,6 +88,24 @@ namespace NKikimr::NBlobDepot {
8688
SpaceColor = msg.GetSpaceColor();
8789
ApproximateFreeSpaceShare = msg.GetApproximateFreeSpaceShare();
8890

91+
S3BackendSettings = msg.HasS3BackendSettings()
92+
? std::make_optional(msg.GetS3BackendSettings())
93+
: std::nullopt;
94+
95+
if (S3WrapperId) {
96+
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, S3WrapperId, SelfId(), nullptr, 0));
97+
S3WrapperId = {};
98+
}
99+
100+
if (S3BackendSettings) {
101+
auto& settings = S3BackendSettings->GetSettings();
102+
ExternalStorageConfig = NWrappers::IExternalStorageConfig::Construct(settings);
103+
S3WrapperId = Register(NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
104+
S3BasePath = TStringBuilder() << settings.GetObjectKeyPattern() << '/' << msg.GetName();
105+
} else {
106+
ExternalStorageConfig = {};
107+
}
108+
89109
OnConnect();
90110
}
91111

@@ -171,6 +191,7 @@ namespace NKikimr::NBlobDepot {
171191
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestContext::TPtr context);
172192
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvCommitBlobSeq msg, TRequestSender *sender, TRequestContext::TPtr context);
173193
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvDiscardSpoiledBlobSeq msg, TRequestSender *sender, TRequestContext::TPtr context);
194+
template ui64 TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvPrepareWriteS3 msg, TRequestSender *sender, TRequestContext::TPtr context);
174195

175196
ui64 TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context) {
176197
const ui64 id = NextTabletRequestId++;

ydb/core/blob_depot/agent/read.cpp

Lines changed: 106 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,25 +72,22 @@ namespace NKikimr::NBlobDepot {
7272
ui32 Size;
7373
ui64 OutputOffset;
7474
};
75+
struct TS3ReadItem {
76+
TString Key;
77+
ui32 Offset;
78+
ui32 Size;
79+
ui64 OutputOffset;
80+
};
7581
std::vector<TReadItem> items;
82+
std::vector<TS3ReadItem> s3items;
7683

7784
ui64 offset = arg.Offset;
7885
ui64 size = arg.Size;
7986

8087
for (const auto& value : arg.Value.Chain) {
81-
const ui32 groupId = value.GroupId;
82-
const auto& blobId = value.BlobId;
8388
const ui32 begin = value.SubrangeBegin;
8489
const ui32 end = value.SubrangeEnd;
8590

86-
if (end <= begin || blobId.BlobSize() < end) {
87-
error = "incorrect SubrangeBegin/SubrangeEnd pair";
88-
STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()),
89-
(ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size),
90-
(Value, arg.Value));
91-
return false;
92-
}
93-
9491
// calculate the whole length of current part
9592
ui64 partLen = end - begin;
9693
if (offset >= partLen) {
@@ -103,7 +100,26 @@ namespace NKikimr::NBlobDepot {
103100
partLen = Min(size ? size : Max<ui64>(), partLen - offset);
104101
Y_ABORT_UNLESS(partLen);
105102

106-
items.push_back(TReadItem{groupId, blobId, ui32(offset + begin), ui32(partLen), outputOffset});
103+
ui32 itemLen = 0;
104+
ui32 partOffset = offset + begin;
105+
106+
if (value.Blob) {
107+
const auto& [blobId, groupId] = *value.Blob;
108+
items.push_back(TReadItem{groupId, blobId, partOffset, ui32(partLen), outputOffset});
109+
itemLen = blobId.BlobSize();
110+
} else if (const auto& locator = value.S3Locator) {
111+
TString key = locator->MakeObjectName(Agent.S3BasePath);
112+
s3items.push_back(TS3ReadItem{std::move(key), partOffset, ui32(partLen), outputOffset});
113+
itemLen = locator->Len;
114+
}
115+
116+
if (end <= begin || itemLen < end) {
117+
error = "incorrect SubrangeBegin/SubrangeEnd pair";
118+
STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()),
119+
(ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size),
120+
(Value, arg.Value));
121+
return false;
122+
}
107123

108124
outputOffset += partLen;
109125
offset = 0;
@@ -156,6 +172,85 @@ namespace NKikimr::NBlobDepot {
156172
++context->NumPartsPending;
157173
}
158174

175+
for (TS3ReadItem& item : s3items) {
176+
class TGetActor : public TActor<TGetActor> {
177+
size_t OutputOffset;
178+
std::shared_ptr<TReadContext> ReadContext;
179+
TQuery* const Query;
180+
181+
TString AgentLogId;
182+
TString QueryId;
183+
ui64 ReadId;
184+
185+
public:
186+
TGetActor(size_t outputOffset, std::shared_ptr<TReadContext> readContext, TQuery *query)
187+
: TActor(&TThis::StateFunc)
188+
, OutputOffset(outputOffset)
189+
, ReadContext(std::move(readContext))
190+
, Query(query)
191+
, AgentLogId(query->Agent.LogId)
192+
, QueryId(query->GetQueryId())
193+
, ReadId(ReadContext->GetTag())
194+
{}
195+
196+
void Handle(NWrappers::TEvExternalStorage::TEvGetObjectResponse::TPtr ev) {
197+
auto& msg = *ev->Get();
198+
199+
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA55, "received TEvGetObjectResponse",
200+
(AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId),
201+
(Response, msg.Result), (BodyLen, std::size(msg.Body)));
202+
203+
if (msg.IsSuccess()) {
204+
Finish(std::move(msg.Body), "");
205+
} else if (const auto& error = msg.GetError(); error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) {
206+
Finish(std::nullopt, "data has disappeared from S3");
207+
} else {
208+
Finish(std::nullopt, msg.GetError().GetMessage().c_str());
209+
}
210+
}
211+
212+
void HandleUndelivered() {
213+
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA56, "received TEvUndelivered",
214+
(AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId));
215+
Finish(std::nullopt, "wrapper actor terminated");
216+
}
217+
218+
void Finish(std::optional<TString> data, const char *error) {
219+
auto& context = *ReadContext;
220+
if (!context.Terminated && !context.StopProcessingParts) {
221+
if (data) {
222+
context.Buffer.Write(OutputOffset, TRope(std::move(*data)));
223+
if (!--context.NumPartsPending) {
224+
context.EndWithSuccess(Query);
225+
}
226+
} else {
227+
context.EndWithError(Query, NKikimrProto::ERROR, TStringBuilder()
228+
<< "failed to fetch data from S3: " << error);
229+
}
230+
}
231+
PassAway();
232+
}
233+
234+
STRICT_STFUNC(StateFunc,
235+
hFunc(NWrappers::TEvExternalStorage::TEvGetObjectResponse, Handle)
236+
cFunc(TEvents::TSystem::Undelivered, HandleUndelivered)
237+
cFunc(TEvents::TSystem::Poison, PassAway)
238+
)
239+
};
240+
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA57, "starting S3 read", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
241+
(ReadId, context->GetTag()), (Key, item.Key), (Offset, item.Offset), (Size, item.Size),
242+
(OutputOffset, item.OutputOffset));
243+
const TActorId actorId = Agent.RegisterWithSameMailbox(new TGetActor(item.OutputOffset, context, this));
244+
auto request = std::make_unique<NWrappers::TEvExternalStorage::TEvGetObjectRequest>(
245+
Aws::S3::Model::GetObjectRequest()
246+
.WithBucket(Agent.S3BackendSettings->GetSettings().GetBucket())
247+
.WithKey(std::move(item.Key))
248+
.WithRange(TStringBuilder() << "bytes=" << item.Offset << '-' << item.Offset + item.Size - 1)
249+
);
250+
TActivationContext::Send(new IEventHandle(Agent.S3WrapperId, actorId, request.release(), IEventHandle::FlagTrackDelivery));
251+
++context->NumPartsPending;
252+
}
253+
159254
Y_ABORT_UNLESS(context->NumPartsPending);
160255

161256
return true;

ydb/core/blob_depot/agent/request.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ namespace NKikimr::NBlobDepot {
9797
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCollectGarbageResult::TPtr ev);
9898
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev);
9999
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev);
100+
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvPrepareWriteS3Result::TPtr ev);
100101

101102
template<typename TEvent>
102103
void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) {

ydb/core/blob_depot/agent/resolved_value.cpp

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,36 @@
33
namespace NKikimr::NBlobDepot {
44

55
TResolvedValue::TLink::TLink(const NKikimrBlobDepot::TResolvedValueChain& link)
6-
: BlobId(LogoBlobIDFromLogoBlobID(link.GetBlobId()))
7-
, GroupId(link.GetGroupId())
8-
, SubrangeBegin(link.GetSubrangeBegin())
9-
, SubrangeEnd(link.HasSubrangeEnd() ? link.GetSubrangeEnd() : BlobId.BlobSize())
6+
: SubrangeBegin(link.GetSubrangeBegin())
7+
, SubrangeEnd(link.GetSubrangeEnd())
108
{
11-
Y_DEBUG_ABORT_UNLESS(link.HasBlobId() && link.HasGroupId());
9+
std::optional<ui32> length;
10+
if (link.HasBlobId() && link.HasGroupId()) {
11+
const TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(link.GetBlobId());
12+
Blob.emplace(blobId, link.GetGroupId());
13+
Y_VERIFY_S(!length || *length == blobId.BlobSize(), SingleLineProto(link));
14+
length.emplace(blobId.BlobSize());
15+
}
16+
if (link.HasS3Locator()) {
17+
S3Locator.emplace(TS3Locator::FromProto(link.GetS3Locator()));
18+
Y_VERIFY_S(!length || *length == S3Locator->Len, SingleLineProto(link));
19+
length.emplace(S3Locator->Len);
20+
}
21+
if (!link.HasSubrangeEnd()) {
22+
Y_VERIFY_S(length, SingleLineProto(link));
23+
SubrangeEnd = *length;
24+
}
1225
}
1326

1427
void TResolvedValue::TLink::Output(IOutputStream& s) const {
15-
s << BlobId << '@' << GroupId << '{' << SubrangeBegin << '-' << SubrangeEnd - 1 << '}';
28+
if (Blob) {
29+
const auto& [blobId, groupId] = *Blob;
30+
s << blobId << '@' << groupId;
31+
}
32+
if (S3Locator) {
33+
s << S3Locator->ToString();
34+
}
35+
s << '{' << SubrangeBegin << '-' << SubrangeEnd - 1 << '}';
1636
}
1737

1838
TString TResolvedValue::TLink::ToString() const {

ydb/core/blob_depot/agent/resolved_value.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ namespace NKikimr::NBlobDepot {
66

77
struct TResolvedValue {
88
struct TLink {
9-
TLogoBlobID BlobId;
10-
ui32 GroupId;
9+
std::optional<std::tuple<TLogoBlobID, ui32>> Blob;
10+
std::optional<TS3Locator> S3Locator;
1111
ui32 SubrangeBegin;
1212
ui32 SubrangeEnd;
1313

@@ -16,10 +16,7 @@ namespace NKikimr::NBlobDepot {
1616
void Output(IOutputStream& s) const;
1717
TString ToString() const;
1818

19-
friend bool operator ==(const TLink& x, const TLink& y) {
20-
return x.BlobId == y.BlobId && x.GroupId == y.GroupId && x.SubrangeBegin == y.SubrangeBegin &&
21-
x.SubrangeEnd == y.SubrangeEnd;
22-
}
19+
friend std::strong_ordering operator <=>(const TLink&, const TLink&) = default;
2320
};
2421

2522
bool Defined = false;

0 commit comments

Comments
 (0)