|
1 |
| -#include "schemeshard_audit_log.h" |
2 |
| -#include "schemeshard_path.h" |
3 |
| -#include "schemeshard_audit_log_fragment.h" |
| 1 | +#include <util/string/vector.h> |
| 2 | + |
| 3 | +#include <ydb/public/api/protos/ydb_export.pb.h> |
| 4 | +#include <ydb/public/api/protos/ydb_import.pb.h> |
4 | 5 |
|
5 |
| -#include <ydb/core/audit/audit_log.h> |
6 | 6 | #include <ydb/core/protos/flat_tx_scheme.pb.h>
|
| 7 | +#include <ydb/core/protos/export.pb.h> |
| 8 | +#include <ydb/core/protos/import.pb.h> |
| 9 | + |
7 | 10 | #include <ydb/core/util/address_classifier.h>
|
8 |
| -#include <util/string/vector.h> |
| 11 | +#include <ydb/core/audit/audit_log.h> |
| 12 | + |
| 13 | +#include "schemeshard_path.h" |
| 14 | +#include "schemeshard_impl.h" |
| 15 | +#include "schemeshard_xxport__helpers.h" |
| 16 | +#include "schemeshard_audit_log_fragment.h" |
| 17 | +#include "schemeshard_audit_log.h" |
9 | 18 |
|
10 | 19 | namespace NKikimr::NSchemeShard {
|
11 | 20 |
|
12 | 21 | namespace {
|
13 |
| - const TString SchemeshardComponentName = "schemeshard"; |
14 | 22 |
|
15 |
| - //NOTE: EmptyValue couldn't be an empty string as AUDIT_PART() skips parts with an empty values |
16 |
| - const TString EmptyValue = "{none}"; |
17 |
| -} |
| 23 | +const TString SchemeshardComponentName = "schemeshard"; |
| 24 | + |
| 25 | +//NOTE: EmptyValue couldn't be an empty string as AUDIT_PART() skips parts with an empty values |
| 26 | +const TString EmptyValue = "{none}"; |
18 | 27 |
|
19 | 28 | TString GeneralStatus(NKikimrScheme::EStatus actualStatus) {
|
20 | 29 | switch(actualStatus) {
|
@@ -68,6 +77,8 @@ TPath DatabasePathFromWorkingDir(TSchemeShard* SS, const TString &opWorkingDir)
|
68 | 77 | return databasePath;
|
69 | 78 | }
|
70 | 79 |
|
| 80 | +} // anonymous namespace |
| 81 | + |
71 | 82 | void AuditLogModifySchemeTransaction(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID) {
|
72 | 83 | // Each TEvModifySchemeTransaction.Transaction is a self sufficient operation and should be logged independently
|
73 | 84 | // (even if it was packed into a single TxProxy transaction with some other operations).
|
@@ -167,6 +178,205 @@ void AuditLogModifySchemeTransactionDeprecated(const NKikimrScheme::TEvModifySch
|
167 | 178 | }
|
168 | 179 | }
|
169 | 180 |
|
| 181 | +namespace { |
| 182 | + |
| 183 | +struct TXxportRecord { |
| 184 | + TString OperationName; |
| 185 | + ui64 Id; |
| 186 | + TString Uid; |
| 187 | + TString RemoteAddress; |
| 188 | + TString UserSID; |
| 189 | + TString DatabasePath; |
| 190 | + TString Status; |
| 191 | + Ydb::StatusIds::StatusCode DetailedStatus; |
| 192 | + TString Reason; |
| 193 | + TVector<std::pair<TString, TString>> AdditionalParts; |
| 194 | + TString StartTime; |
| 195 | + TString EndTime; |
| 196 | + TString CloudId; |
| 197 | + TString FolderId; |
| 198 | + TString ResourceId; |
| 199 | +}; |
| 200 | + |
| 201 | +void AuditLogXxport(TXxportRecord&& record) { |
| 202 | + AUDIT_LOG( |
| 203 | + AUDIT_PART("component", SchemeshardComponentName) |
| 204 | + |
| 205 | + AUDIT_PART("id", std::to_string(record.Id)) |
| 206 | + AUDIT_PART("uid", record.Uid); |
| 207 | + AUDIT_PART("remote_address", (!record.RemoteAddress.empty() ? record.RemoteAddress : EmptyValue)) |
| 208 | + AUDIT_PART("subject", (!record.UserSID.empty() ? record.UserSID : EmptyValue)) |
| 209 | + AUDIT_PART("database", (!record.DatabasePath.empty() ? record.DatabasePath : EmptyValue)) |
| 210 | + AUDIT_PART("operation", record.OperationName) |
| 211 | + AUDIT_PART("status", record.Status) |
| 212 | + AUDIT_PART("detailed_status", Ydb::StatusIds::StatusCode_Name(record.DetailedStatus)) |
| 213 | + AUDIT_PART("reason", record.Reason) |
| 214 | + |
| 215 | + // all parts are considered required, so all empty values are replaced with a special stub |
| 216 | + for (const auto& [name, value] : record.AdditionalParts) { |
| 217 | + AUDIT_PART(name, (!value.empty() ? value : EmptyValue)) |
| 218 | + } |
| 219 | + |
| 220 | + AUDIT_PART("start_time", record.StartTime) |
| 221 | + AUDIT_PART("end_time", record.EndTime) |
| 222 | + |
| 223 | + AUDIT_PART("cloud_id", record.CloudId); |
| 224 | + AUDIT_PART("folder_id", record.FolderId); |
| 225 | + AUDIT_PART("resource_id", record.ResourceId); |
| 226 | + ); |
| 227 | +} |
| 228 | + |
| 229 | +using TParts = decltype(TXxportRecord::AdditionalParts); |
| 230 | + |
| 231 | +template <class Proto> |
| 232 | +TParts ExportKindSpecificParts(const Proto& proto) { |
| 233 | + //NOTE: intentional switch -- that will help to detect (by breaking the compilation) |
| 234 | + // the moment when and if oneof Settings will be extended |
| 235 | + switch (proto.GetSettingsCase()) { |
| 236 | + case Proto::kExportToYtSettings: |
| 237 | + return ExportKindSpecificParts(proto.GetExportToYtSettings()); |
| 238 | + case Proto::kExportToS3Settings: |
| 239 | + return ExportKindSpecificParts(proto.GetExportToS3Settings()); |
| 240 | + case Proto::SETTINGS_NOT_SET: |
| 241 | + return {}; |
| 242 | + } |
| 243 | +} |
| 244 | +template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToYtSettings& proto) { |
| 245 | + return { |
| 246 | + {"export_type", "yt"}, |
| 247 | + {"export_item_count", ToString(proto.items().size())}, |
| 248 | + {"export_yt_prefix", ((proto.items().size() > 0) ? proto.items(0).destination_path() : "")}, |
| 249 | + }; |
| 250 | +} |
| 251 | +template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToS3Settings& proto) { |
| 252 | + return { |
| 253 | + {"export_type", "s3"}, |
| 254 | + {"export_item_count", ToString(proto.items().size())}, |
| 255 | + {"export_s3_bucket", proto.bucket()}, |
| 256 | + //NOTE: take first item's destination_prefix as a "good enough approximation" |
| 257 | + // (each item has its own destination_prefix, but in practice they are all the same) |
| 258 | + {"export_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).destination_prefix() : "")}, |
| 259 | + }; |
| 260 | +} |
| 261 | + |
| 262 | +template <class Proto> |
| 263 | +TParts ImportKindSpecificParts(const Proto& proto) { |
| 264 | + //NOTE: intentional switch -- that will help to detect (by breaking the compilation) |
| 265 | + // the moment when and if oneof Settings will be extended |
| 266 | + switch (proto.GetSettingsCase()) { |
| 267 | + case Proto::kImportFromS3Settings: |
| 268 | + return ImportKindSpecificParts(proto.GetImportFromS3Settings()); |
| 269 | + case Proto::SETTINGS_NOT_SET: |
| 270 | + return {}; |
| 271 | + } |
| 272 | +} |
| 273 | +template <> TParts ImportKindSpecificParts(const Ydb::Import::ImportFromS3Settings& proto) { |
| 274 | + return { |
| 275 | + {"import_type", "s3"}, |
| 276 | + {"export_item_count", ToString(proto.items().size())}, |
| 277 | + {"import_s3_bucket", proto.bucket()}, |
| 278 | + //NOTE: take first item's source_prefix as a "good enough approximation" |
| 279 | + // (each item has its own source_prefix, but in practice they are all the same) |
| 280 | + {"import_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).source_prefix() : "")}, |
| 281 | + }; |
| 282 | +} |
| 283 | + |
| 284 | +} // anonymous namespace |
| 285 | + |
| 286 | +template <class Request, class Response> |
| 287 | +void _AuditLogXxportStart(const Request& request, const Response& response, const TString& operationName, TParts&& additionalParts, TSchemeShard* SS) { |
| 288 | + TPath databasePath = DatabasePathFromWorkingDir(SS, request.GetDatabaseName()); |
| 289 | + auto [cloud_id, folder_id, database_id] = GetDatabaseCloudIds(databasePath); |
| 290 | + auto peerName = NKikimr::NAddressClassifier::ExtractAddress(request.GetPeerName()); |
| 291 | + const auto& entry = response.GetResponse().GetEntry(); |
| 292 | + |
| 293 | + AuditLogXxport({ |
| 294 | + .OperationName = operationName, |
| 295 | + //NOTE: original request's tx-id is used as an operation id |
| 296 | + .Id = request.GetTxId(), |
| 297 | + .Uid = GetUid(request.GetRequest().GetOperationParams()), |
| 298 | + .RemoteAddress = peerName, |
| 299 | + .UserSID = request.GetUserSID(), |
| 300 | + .DatabasePath = databasePath.PathString(), |
| 301 | + .Status = (entry.GetStatus() == Ydb::StatusIds::SUCCESS ? "SUCCESS" : "ERROR"), |
| 302 | + .DetailedStatus = entry.GetStatus(), |
| 303 | + //NOTE: use main issue (on {ex,im}port itself), ignore issues on individual items |
| 304 | + .Reason = ((entry.IssuesSize() > 0) ? entry.GetIssues(0).message() : ""), |
| 305 | + |
| 306 | + .AdditionalParts = std::move(additionalParts), |
| 307 | + |
| 308 | + // no start or end times |
| 309 | + |
| 310 | + .CloudId = cloud_id, |
| 311 | + .FolderId = folder_id, |
| 312 | + .ResourceId = database_id, |
| 313 | + }); |
| 314 | +} |
| 315 | + |
| 316 | +void AuditLogExportStart(const NKikimrExport::TEvCreateExportRequest& request, const NKikimrExport::TEvCreateExportResponse& response, TSchemeShard* SS) { |
| 317 | + _AuditLogXxportStart(request, response, "EXPORT START", ExportKindSpecificParts(request.GetRequest()), SS); |
| 318 | +} |
| 319 | + |
| 320 | +void AuditLogImportStart(const NKikimrImport::TEvCreateImportRequest& request, const NKikimrImport::TEvCreateImportResponse& response, TSchemeShard* SS) { |
| 321 | + _AuditLogXxportStart(request, response, "IMPORT START", ImportKindSpecificParts(request.GetRequest()), SS); |
| 322 | +} |
| 323 | + |
| 324 | +template <class Info> |
| 325 | +void _AuditLogXxportEnd(const Info& info, const TString& operationName, TParts&& additionalParts, TSchemeShard* SS) { |
| 326 | + const TPath databasePath = TPath::Init(info.DomainPathId, SS); |
| 327 | + auto [cloud_id, folder_id, database_id] = GetDatabaseCloudIds(databasePath); |
| 328 | + auto peerName = NKikimr::NAddressClassifier::ExtractAddress(info.PeerName); |
| 329 | + TString userSID = *info.UserSID.OrElse(EmptyValue); |
| 330 | + TString startTime = (info.StartTime != TInstant::Zero() ? info.StartTime.ToString() : TString()); |
| 331 | + TString endTime = (info.EndTime != TInstant::Zero() ? info.EndTime.ToString() : TString()); |
| 332 | + |
| 333 | + // Info.State can't be anything but Done or Cancelled here |
| 334 | + Y_ABORT_UNLESS(info.State == Info::EState::Done || info.State == Info::EState::Cancelled); |
| 335 | + TString status = TString(info.State == Info::EState::Done ? "SUCCESS" : "ERROR"); |
| 336 | + Ydb::StatusIds::StatusCode detailedStatus = (info.State == Info::EState::Done ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::CANCELLED); |
| 337 | + |
| 338 | + AuditLogXxport({ |
| 339 | + .OperationName = operationName, |
| 340 | + .Id = info.Id, |
| 341 | + .Uid = info.Uid, |
| 342 | + .RemoteAddress = peerName, |
| 343 | + .UserSID = userSID, |
| 344 | + .DatabasePath = databasePath.PathString(), |
| 345 | + .Status = status, |
| 346 | + .DetailedStatus = detailedStatus, |
| 347 | + .Reason = info.Issue, |
| 348 | + |
| 349 | + .AdditionalParts = std::move(additionalParts), |
| 350 | + |
| 351 | + .StartTime = startTime, |
| 352 | + .EndTime = endTime, |
| 353 | + |
| 354 | + .CloudId = cloud_id, |
| 355 | + .FolderId = folder_id, |
| 356 | + .ResourceId = database_id, |
| 357 | + }); |
| 358 | +} |
| 359 | + |
| 360 | +void AuditLogExportEnd(const TExportInfo& info, TSchemeShard* SS) { |
| 361 | + NKikimrExport::TCreateExportRequest proto; |
| 362 | + // TSchemeShard::FromXxportInfo() can not be used here |
| 363 | + switch (info.Kind) { |
| 364 | + case TExportInfo::EKind::YT: |
| 365 | + Y_ABORT_UNLESS(proto.MutableExportToYtSettings()->ParseFromString(info.Settings)); |
| 366 | + proto.MutableExportToYtSettings()->clear_token(); |
| 367 | + break; |
| 368 | + case TExportInfo::EKind::S3: |
| 369 | + Y_ABORT_UNLESS(proto.MutableExportToS3Settings()->ParseFromString(info.Settings)); |
| 370 | + proto.MutableExportToS3Settings()->clear_access_key(); |
| 371 | + proto.MutableExportToS3Settings()->clear_secret_key(); |
| 372 | + break; |
| 373 | + } |
| 374 | + _AuditLogXxportEnd(info, "EXPORT END", ExportKindSpecificParts(proto), SS); |
| 375 | +} |
| 376 | +void AuditLogImportEnd(const TImportInfo& info, TSchemeShard* SS) { |
| 377 | + _AuditLogXxportEnd(info, "IMPORT END", ImportKindSpecificParts(info.Settings), SS); |
| 378 | +} |
| 379 | + |
170 | 380 | void AuditLogLogin(const NKikimrScheme::TEvLogin& request, const NKikimrScheme::TEvLoginResult& response, TSchemeShard* SS) {
|
171 | 381 | static const TString LoginOperationName = "LOGIN";
|
172 | 382 |
|
|
0 commit comments