Skip to content

Commit b497cd7

Browse files
ijonspuchin
authored andcommitted
24-3: auditlog: add exports/imports (ydb-platform#8751)
Add audit logging for (database/tables) export and import operations. Cancel/Forget requests for export/imports are still not audit logged. KIKIMR-21797 merge ydb-platform#8550 (a418278) from `main`.
1 parent ecd2d22 commit b497cd7

28 files changed

+992
-125
lines changed

ydb/core/grpc_services/rpc_export.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
4141
if (this->UserToken) {
4242
ev->Record.SetUserSID(this->UserToken->GetUserSID());
4343
}
44+
ev->Record.SetPeerName(this->Request->GetPeerName());
4445

4546
auto& createExport = *ev->Record.MutableRequest();
4647
*createExport.MutableOperationParams() = request.operation_params();

ydb/core/grpc_services/rpc_import.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
3939
if (this->UserToken) {
4040
ev->Record.SetUserSID(this->UserToken->GetUserSID());
4141
}
42+
ev->Record.SetPeerName(this->Request->GetPeerName());
4243

4344
auto& createImport = *ev->Record.MutableRequest();
4445
createImport.MutableOperationParams()->CopyFrom(request.operation_params());

ydb/core/protos/export.proto

+8-3
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ message TCreateExportRequest {
3434
message TEvCreateExportRequest {
3535
optional uint64 TxId = 1;
3636
optional string DatabaseName = 2;
37-
optional string UserSID = 4;
3837
optional TCreateExportRequest Request = 3;
38+
optional string UserSID = 4;
39+
optional string PeerName = 5;
3940
}
4041

4142
message TCreateExportResponse {
@@ -70,8 +71,10 @@ message TCancelExportRequest {
7071

7172
message TEvCancelExportRequest {
7273
optional uint64 TxId = 1;
73-
optional string DatabaseName = 3;
7474
optional TCancelExportRequest Request = 2;
75+
optional string DatabaseName = 3;
76+
optional string UserSID = 4;
77+
optional string PeerName = 5;
7578
}
7679

7780
message TCancelExportResponse {
@@ -90,8 +93,10 @@ message TForgetExportRequest {
9093

9194
message TEvForgetExportRequest {
9295
optional uint64 TxId = 1;
93-
optional string DatabaseName = 3;
9496
optional TForgetExportRequest Request = 2;
97+
optional string DatabaseName = 3;
98+
optional string UserSID = 4;
99+
optional string PeerName = 5;
95100
}
96101

97102
message TForgetExportResponse {

ydb/core/protos/import.proto

+7-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ message TEvCreateImportRequest {
3434
optional string DatabaseName = 2;
3535
optional string UserSID = 3;
3636
optional TCreateImportRequest Request = 4;
37+
optional string PeerName = 5;
3738
}
3839

3940
message TCreateImportResponse {
@@ -68,8 +69,10 @@ message TCancelImportRequest {
6869

6970
message TEvCancelImportRequest {
7071
optional uint64 TxId = 1;
71-
optional string DatabaseName = 3;
7272
optional TCancelImportRequest Request = 2;
73+
optional string DatabaseName = 3;
74+
optional string UserSID = 4;
75+
optional string PeerName = 5;
7376
}
7477

7578
message TCancelImportResponse {
@@ -88,8 +91,10 @@ message TForgetImportRequest {
8891

8992
message TEvForgetImportRequest {
9093
optional uint64 TxId = 1;
91-
optional string DatabaseName = 3;
9294
optional TForgetImportRequest Request = 2;
95+
optional string DatabaseName = 3;
96+
optional string UserSID = 4;
97+
optional string PeerName = 5;
9398
}
9499

95100
message TForgetImportResponse {

ydb/core/tx/schemeshard/schemeshard__init.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -4199,8 +4199,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
41994199
TString settings = rowset.GetValue<Schema::Exports::Settings>();
42004200
auto domainPathId = TPathId(rowset.GetValueOrDefault<Schema::Exports::DomainPathOwnerId>(selfId),
42014201
rowset.GetValue<Schema::Exports::DomainPathId>());
4202+
TString peerName = rowset.GetValueOrDefault<Schema::Exports::PeerName>();
42024203

4203-
TExportInfo::TPtr exportInfo = new TExportInfo(id, uid, kind, settings, domainPathId);
4204+
TExportInfo::TPtr exportInfo = new TExportInfo(id, uid, kind, settings, domainPathId, peerName);
42044205

42054206
if (rowset.HaveValue<Schema::Exports::UserSID>()) {
42064207
exportInfo->UserSID = rowset.GetValue<Schema::Exports::UserSID>();
@@ -4297,11 +4298,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
42974298
TImportInfo::EKind kind = static_cast<TImportInfo::EKind>(rowset.GetValue<Schema::Imports::Kind>());
42984299
auto domainPathId = TPathId(rowset.GetValue<Schema::Imports::DomainPathOwnerId>(),
42994300
rowset.GetValue<Schema::Imports::DomainPathLocalId>());
4301+
TString peerName = rowset.GetValueOrDefault<Schema::Imports::PeerName>();
43004302

43014303
Ydb::Import::ImportFromS3Settings settings;
43024304
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(settings, rowset.GetValue<Schema::Imports::Settings>()));
43034305

4304-
TImportInfo::TPtr importInfo = new TImportInfo(id, uid, kind, settings, domainPathId);
4306+
TImportInfo::TPtr importInfo = new TImportInfo(id, uid, kind, settings, domainPathId, peerName);
43054307

43064308
if (rowset.HaveValue<Schema::Imports::UserSID>()) {
43074309
importInfo->UserSID = rowset.GetValue<Schema::Imports::UserSID>();

ydb/core/tx/schemeshard/schemeshard_audit_log.cpp

+219-9
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,29 @@
1-
#include "schemeshard_audit_log.h"
2-
#include "schemeshard_path.h"
3-
#include "schemeshard_audit_log_fragment.h"
1+
#include <util/string/vector.h>
2+
3+
#include <ydb/public/api/protos/ydb_export.pb.h>
4+
#include <ydb/public/api/protos/ydb_import.pb.h>
45

5-
#include <ydb/core/audit/audit_log.h>
66
#include <ydb/core/protos/flat_tx_scheme.pb.h>
7+
#include <ydb/core/protos/export.pb.h>
8+
#include <ydb/core/protos/import.pb.h>
9+
710
#include <ydb/core/util/address_classifier.h>
8-
#include <util/string/vector.h>
11+
#include <ydb/core/audit/audit_log.h>
12+
13+
#include "schemeshard_path.h"
14+
#include "schemeshard_impl.h"
15+
#include "schemeshard_xxport__helpers.h"
16+
#include "schemeshard_audit_log_fragment.h"
17+
#include "schemeshard_audit_log.h"
918

1019
namespace NKikimr::NSchemeShard {
1120

1221
namespace {
13-
const TString SchemeshardComponentName = "schemeshard";
1422

15-
//NOTE: EmptyValue couldn't be an empty string as AUDIT_PART() skips parts with an empty values
16-
const TString EmptyValue = "{none}";
17-
}
23+
const TString SchemeshardComponentName = "schemeshard";
24+
25+
//NOTE: EmptyValue couldn't be an empty string as AUDIT_PART() skips parts with an empty values
26+
const TString EmptyValue = "{none}";
1827

1928
TString GeneralStatus(NKikimrScheme::EStatus actualStatus) {
2029
switch(actualStatus) {
@@ -68,6 +77,8 @@ TPath DatabasePathFromWorkingDir(TSchemeShard* SS, const TString &opWorkingDir)
6877
return databasePath;
6978
}
7079

80+
} // anonymous namespace
81+
7182
void AuditLogModifySchemeTransaction(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID) {
7283
// Each TEvModifySchemeTransaction.Transaction is a self sufficient operation and should be logged independently
7384
// (even if it was packed into a single TxProxy transaction with some other operations).
@@ -167,6 +178,205 @@ void AuditLogModifySchemeTransactionDeprecated(const NKikimrScheme::TEvModifySch
167178
}
168179
}
169180

181+
namespace {
182+
183+
struct TXxportRecord {
184+
TString OperationName;
185+
ui64 Id;
186+
TString Uid;
187+
TString RemoteAddress;
188+
TString UserSID;
189+
TString DatabasePath;
190+
TString Status;
191+
Ydb::StatusIds::StatusCode DetailedStatus;
192+
TString Reason;
193+
TVector<std::pair<TString, TString>> AdditionalParts;
194+
TString StartTime;
195+
TString EndTime;
196+
TString CloudId;
197+
TString FolderId;
198+
TString ResourceId;
199+
};
200+
201+
void AuditLogXxport(TXxportRecord&& record) {
202+
AUDIT_LOG(
203+
AUDIT_PART("component", SchemeshardComponentName)
204+
205+
AUDIT_PART("id", std::to_string(record.Id))
206+
AUDIT_PART("uid", record.Uid);
207+
AUDIT_PART("remote_address", (!record.RemoteAddress.empty() ? record.RemoteAddress : EmptyValue))
208+
AUDIT_PART("subject", (!record.UserSID.empty() ? record.UserSID : EmptyValue))
209+
AUDIT_PART("database", (!record.DatabasePath.empty() ? record.DatabasePath : EmptyValue))
210+
AUDIT_PART("operation", record.OperationName)
211+
AUDIT_PART("status", record.Status)
212+
AUDIT_PART("detailed_status", Ydb::StatusIds::StatusCode_Name(record.DetailedStatus))
213+
AUDIT_PART("reason", record.Reason)
214+
215+
// all parts are considered required, so all empty values are replaced with a special stub
216+
for (const auto& [name, value] : record.AdditionalParts) {
217+
AUDIT_PART(name, (!value.empty() ? value : EmptyValue))
218+
}
219+
220+
AUDIT_PART("start_time", record.StartTime)
221+
AUDIT_PART("end_time", record.EndTime)
222+
223+
AUDIT_PART("cloud_id", record.CloudId);
224+
AUDIT_PART("folder_id", record.FolderId);
225+
AUDIT_PART("resource_id", record.ResourceId);
226+
);
227+
}
228+
229+
using TParts = decltype(TXxportRecord::AdditionalParts);
230+
231+
template <class Proto>
232+
TParts ExportKindSpecificParts(const Proto& proto) {
233+
//NOTE: intentional switch -- that will help to detect (by breaking the compilation)
234+
// the moment when and if oneof Settings will be extended
235+
switch (proto.GetSettingsCase()) {
236+
case Proto::kExportToYtSettings:
237+
return ExportKindSpecificParts(proto.GetExportToYtSettings());
238+
case Proto::kExportToS3Settings:
239+
return ExportKindSpecificParts(proto.GetExportToS3Settings());
240+
case Proto::SETTINGS_NOT_SET:
241+
return {};
242+
}
243+
}
244+
template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToYtSettings& proto) {
245+
return {
246+
{"export_type", "yt"},
247+
{"export_item_count", ToString(proto.items().size())},
248+
{"export_yt_prefix", ((proto.items().size() > 0) ? proto.items(0).destination_path() : "")},
249+
};
250+
}
251+
template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToS3Settings& proto) {
252+
return {
253+
{"export_type", "s3"},
254+
{"export_item_count", ToString(proto.items().size())},
255+
{"export_s3_bucket", proto.bucket()},
256+
//NOTE: take first item's destination_prefix as a "good enough approximation"
257+
// (each item has its own destination_prefix, but in practice they are all the same)
258+
{"export_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).destination_prefix() : "")},
259+
};
260+
}
261+
262+
template <class Proto>
263+
TParts ImportKindSpecificParts(const Proto& proto) {
264+
//NOTE: intentional switch -- that will help to detect (by breaking the compilation)
265+
// the moment when and if oneof Settings will be extended
266+
switch (proto.GetSettingsCase()) {
267+
case Proto::kImportFromS3Settings:
268+
return ImportKindSpecificParts(proto.GetImportFromS3Settings());
269+
case Proto::SETTINGS_NOT_SET:
270+
return {};
271+
}
272+
}
273+
template <> TParts ImportKindSpecificParts(const Ydb::Import::ImportFromS3Settings& proto) {
274+
return {
275+
{"import_type", "s3"},
276+
{"export_item_count", ToString(proto.items().size())},
277+
{"import_s3_bucket", proto.bucket()},
278+
//NOTE: take first item's source_prefix as a "good enough approximation"
279+
// (each item has its own source_prefix, but in practice they are all the same)
280+
{"import_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).source_prefix() : "")},
281+
};
282+
}
283+
284+
} // anonymous namespace
285+
286+
template <class Request, class Response>
287+
void _AuditLogXxportStart(const Request& request, const Response& response, const TString& operationName, TParts&& additionalParts, TSchemeShard* SS) {
288+
TPath databasePath = DatabasePathFromWorkingDir(SS, request.GetDatabaseName());
289+
auto [cloud_id, folder_id, database_id] = GetDatabaseCloudIds(databasePath);
290+
auto peerName = NKikimr::NAddressClassifier::ExtractAddress(request.GetPeerName());
291+
const auto& entry = response.GetResponse().GetEntry();
292+
293+
AuditLogXxport({
294+
.OperationName = operationName,
295+
//NOTE: original request's tx-id is used as an operation id
296+
.Id = request.GetTxId(),
297+
.Uid = GetUid(request.GetRequest().GetOperationParams()),
298+
.RemoteAddress = peerName,
299+
.UserSID = request.GetUserSID(),
300+
.DatabasePath = databasePath.PathString(),
301+
.Status = (entry.GetStatus() == Ydb::StatusIds::SUCCESS ? "SUCCESS" : "ERROR"),
302+
.DetailedStatus = entry.GetStatus(),
303+
//NOTE: use main issue (on {ex,im}port itself), ignore issues on individual items
304+
.Reason = ((entry.IssuesSize() > 0) ? entry.GetIssues(0).message() : ""),
305+
306+
.AdditionalParts = std::move(additionalParts),
307+
308+
// no start or end times
309+
310+
.CloudId = cloud_id,
311+
.FolderId = folder_id,
312+
.ResourceId = database_id,
313+
});
314+
}
315+
316+
void AuditLogExportStart(const NKikimrExport::TEvCreateExportRequest& request, const NKikimrExport::TEvCreateExportResponse& response, TSchemeShard* SS) {
317+
_AuditLogXxportStart(request, response, "EXPORT START", ExportKindSpecificParts(request.GetRequest()), SS);
318+
}
319+
320+
void AuditLogImportStart(const NKikimrImport::TEvCreateImportRequest& request, const NKikimrImport::TEvCreateImportResponse& response, TSchemeShard* SS) {
321+
_AuditLogXxportStart(request, response, "IMPORT START", ImportKindSpecificParts(request.GetRequest()), SS);
322+
}
323+
324+
template <class Info>
325+
void _AuditLogXxportEnd(const Info& info, const TString& operationName, TParts&& additionalParts, TSchemeShard* SS) {
326+
const TPath databasePath = TPath::Init(info.DomainPathId, SS);
327+
auto [cloud_id, folder_id, database_id] = GetDatabaseCloudIds(databasePath);
328+
auto peerName = NKikimr::NAddressClassifier::ExtractAddress(info.PeerName);
329+
TString userSID = *info.UserSID.OrElse(EmptyValue);
330+
TString startTime = (info.StartTime != TInstant::Zero() ? info.StartTime.ToString() : TString());
331+
TString endTime = (info.EndTime != TInstant::Zero() ? info.EndTime.ToString() : TString());
332+
333+
// Info.State can't be anything but Done or Cancelled here
334+
Y_ABORT_UNLESS(info.State == Info::EState::Done || info.State == Info::EState::Cancelled);
335+
TString status = TString(info.State == Info::EState::Done ? "SUCCESS" : "ERROR");
336+
Ydb::StatusIds::StatusCode detailedStatus = (info.State == Info::EState::Done ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::CANCELLED);
337+
338+
AuditLogXxport({
339+
.OperationName = operationName,
340+
.Id = info.Id,
341+
.Uid = info.Uid,
342+
.RemoteAddress = peerName,
343+
.UserSID = userSID,
344+
.DatabasePath = databasePath.PathString(),
345+
.Status = status,
346+
.DetailedStatus = detailedStatus,
347+
.Reason = info.Issue,
348+
349+
.AdditionalParts = std::move(additionalParts),
350+
351+
.StartTime = startTime,
352+
.EndTime = endTime,
353+
354+
.CloudId = cloud_id,
355+
.FolderId = folder_id,
356+
.ResourceId = database_id,
357+
});
358+
}
359+
360+
void AuditLogExportEnd(const TExportInfo& info, TSchemeShard* SS) {
361+
NKikimrExport::TCreateExportRequest proto;
362+
// TSchemeShard::FromXxportInfo() can not be used here
363+
switch (info.Kind) {
364+
case TExportInfo::EKind::YT:
365+
Y_ABORT_UNLESS(proto.MutableExportToYtSettings()->ParseFromString(info.Settings));
366+
proto.MutableExportToYtSettings()->clear_token();
367+
break;
368+
case TExportInfo::EKind::S3:
369+
Y_ABORT_UNLESS(proto.MutableExportToS3Settings()->ParseFromString(info.Settings));
370+
proto.MutableExportToS3Settings()->clear_access_key();
371+
proto.MutableExportToS3Settings()->clear_secret_key();
372+
break;
373+
}
374+
_AuditLogXxportEnd(info, "EXPORT END", ExportKindSpecificParts(proto), SS);
375+
}
376+
void AuditLogImportEnd(const TImportInfo& info, TSchemeShard* SS) {
377+
_AuditLogXxportEnd(info, "IMPORT END", ImportKindSpecificParts(info.Settings), SS);
378+
}
379+
170380
void AuditLogLogin(const NKikimrScheme::TEvLogin& request, const NKikimrScheme::TEvLoginResult& response, TSchemeShard* SS) {
171381
static const TString LoginOperationName = "LOGIN";
172382

ydb/core/tx/schemeshard/schemeshard_audit_log.h

+18
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,30 @@ class TEvLogin;
1010
class TEvLoginResult;
1111
}
1212

13+
namespace NKikimrExport {
14+
class TEvCreateExportRequest;
15+
class TEvCreateExportResponse;
16+
}
17+
18+
namespace NKikimrImport {
19+
class TEvCreateImportRequest;
20+
class TEvCreateImportResponse;
21+
}
22+
1323
namespace NKikimr::NSchemeShard {
1424

1525
class TSchemeShard;
26+
struct TExportInfo;
27+
struct TImportInfo;
1628

1729
void AuditLogModifySchemeTransaction(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID);
1830
void AuditLogModifySchemeTransactionDeprecated(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID);
1931

32+
void AuditLogExportStart(const NKikimrExport::TEvCreateExportRequest& request, const NKikimrExport::TEvCreateExportResponse& response, TSchemeShard* SS);
33+
void AuditLogExportEnd(const TExportInfo& exportInfo, TSchemeShard* SS);
34+
35+
void AuditLogImportStart(const NKikimrImport::TEvCreateImportRequest& request, const NKikimrImport::TEvCreateImportResponse& response, TSchemeShard* SS);
36+
void AuditLogImportEnd(const TImportInfo& importInfo, TSchemeShard* SS);
37+
2038
void AuditLogLogin(const NKikimrScheme::TEvLogin& request, const NKikimrScheme::TEvLoginResult& response, TSchemeShard* SS);
2139
}

0 commit comments

Comments
 (0)