Skip to content

Add saving of the intermediate state of the checksum calculation in import from s3 #15046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions ydb/core/backup/common/checksum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@

#include <util/string/hex.h>

namespace {

template <typename T, size_t N>
void FillArrayFromProto(T (&array)[N], const NProtoBuf::RepeatedField<T>& proto) {
for (int i = 0; i < proto.size(); ++i) {
if (static_cast<size_t>(i) < std::size(array)) {
array[i] = proto.Get(i);
}
}
}

} // anonymous

namespace NKikimr::NBackup {

class TSHA256 : public IChecksum {
Expand All @@ -16,20 +29,49 @@ class TSHA256 : public IChecksum {
SHA256_Update(&Context, data.data(), data.size());
}

TString Serialize() override {
TString Finalize() override {
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256_Final(hash, &Context);
return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH));
}

TChecksumState GetState() const override {
TChecksumState state;
auto& sha256State = *state.MutableSha256State();

for (ui32 h : Context.h) {
sha256State.AddH(h);
}
sha256State.SetNh(Context.Nh);
sha256State.SetNl(Context.Nl);
for (ui32 data : Context.data) {
sha256State.AddData(data);
}
sha256State.SetNum(Context.num);
sha256State.SetMdLen(Context.md_len);

return state;
}

void Continue(const TChecksumState& state) override {
const auto& sha256State = state.GetSha256State();
SHA256_Init(&Context);
FillArrayFromProto(Context.h, sha256State.GetH());
Context.Nh = sha256State.GetNh();
Context.Nl = sha256State.GetNl();
FillArrayFromProto(Context.data, sha256State.GetData());
Context.num = sha256State.GetNum();
Context.md_len = sha256State.GetMdLen();
}

private:
SHA256_CTX Context;
};

TString ComputeChecksum(TStringBuf data) {
IChecksum::TPtr checksum(CreateChecksum());
checksum->AddData(data);
return checksum->Serialize();
return checksum->Finalize();
}

IChecksum* CreateChecksum() {
Expand All @@ -40,4 +82,4 @@ TString ChecksumKey(const TString& objKey) {
return objKey + ".sha256";
}

} // NKikimr::NDataShard
} // NKikimr::NBackup
9 changes: 8 additions & 1 deletion ydb/core/backup/common/checksum.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
#pragma once

#include <ydb/core/protos/checksum.pb.h>

#include <util/generic/string.h>

namespace NKikimr::NBackup {

using NKikimrBackup::TChecksumState;

class IChecksum {
public:
using TPtr = std::unique_ptr<IChecksum>;

virtual ~IChecksum() = default;

virtual void AddData(TStringBuf data) = 0;
virtual TString Serialize() = 0;
virtual TString Finalize() = 0;

virtual TChecksumState GetState() const = 0;
virtual void Continue(const TChecksumState& state) = 0;
};

IChecksum* CreateChecksum();
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/backup/common/metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ ui64 TMetadata::GetVersion() const {

TString TMetadata::Serialize() const {
NJson::TJsonMap m;
m["version"] = *Version;
if (Version.Defined()) {
m["version"] = *Version;
}

NJson::TJsonArray fullBackups;
for (auto &[tp, b] : FullBackups) {
Expand Down
19 changes: 19 additions & 0 deletions ydb/core/protos/checksum.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package NKikimrBackup;
option java_package = "ru.yandex.kikimr.proto";

// Corresponds to the OpenSSL SHA256_CTX structure.
message TSha256State {
repeated uint32 H = 1;
optional uint32 Nh = 2;
optional uint32 Nl = 3;
repeated uint32 Data = 4;
optional uint32 Num = 5;
optional uint32 MdLen = 6;
}

// Used to serialize the intermediate state of a checksum.
message TChecksumState {
oneof state {
TSha256State Sha256State = 1;
}
}
2 changes: 1 addition & 1 deletion ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ message TFeatureFlags {
optional bool EnableDataShardInMemoryStateMigration = 159 [default = true];
optional bool EnableDataShardInMemoryStateMigrationAcrossGenerations = 160 [default = false];
optional bool DisableLocalDBEraseCache = 161 [default = false];
optional bool EnableExportChecksums = 162 [default = false];
optional bool EnableChecksumsExport = 162 [default = false];
optional bool EnableTopicTransfer = 163 [default = false];
optional bool EnableViewExport = 164 [default = false];
optional bool EnableColumnStore = 165 [default = false];
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ SRCS(
bootstrapper.proto
change_exchange.proto
channel_purpose.proto
checksum.proto
cms.proto
compaction.proto
compile_service_config.proto
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableParameterizedDecimal)
FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForCDC)
FEATURE_FLAG_SETTER(EnableFollowerStats)
FEATURE_FLAG_SETTER(EnableExportChecksums)
FEATURE_FLAG_SETTER(EnableChecksumsExport)
FEATURE_FLAG_SETTER(EnableTopicTransfer)
FEATURE_FLAG_SETTER(EnableStrictUserManagement)
FEATURE_FLAG_SETTER(EnableDatabaseAdmin)
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include <ydb/core/protos/tx.pb.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/protos/subdomains.pb.h>
#include <ydb/core/protos/checksum.pb.h>
#include <ydb/core/protos/counters_datashard.pb.h>
#include <ydb/core/protos/table_stats.pb.h>

Expand Down Expand Up @@ -789,9 +790,18 @@ class TDataShard
struct ProcessedBytes : Column<4, NScheme::NTypeIds::Uint64> {};
struct WrittenBytes : Column<5, NScheme::NTypeIds::Uint64> {};
struct WrittenRows : Column<6, NScheme::NTypeIds::Uint64> {};
struct ChecksumState : Column<7, NScheme::NTypeIds::String> { using Type = NKikimrBackup::TChecksumState; };

using TKey = TableKey<TxId>;
using TColumns = TableColumns<TxId, SchemeETag, DataETag, ProcessedBytes, WrittenBytes, WrittenRows>;
using TColumns = TableColumns<
TxId,
SchemeETag,
DataETag,
ProcessedBytes,
WrittenBytes,
WrittenRows,
ChecksumState
>;
};

struct ChangeRecords : Table<17> {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard_s3_download.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <ydb/core/protos/checksum.pb.h>

#include <util/generic/maybe.h>

namespace NKikimr {
Expand All @@ -10,13 +12,15 @@ struct TS3Download {
ui64 ProcessedBytes = 0;
ui64 WrittenBytes = 0;
ui64 WrittenRows = 0;
NKikimrBackup::TChecksumState ChecksumState;

void Out(IOutputStream& out) const {
out << "{"
<< " DataETag: " << DataETag
<< " ProcessedBytes: " << ProcessedBytes
<< " WrittenBytes: " << WrittenBytes
<< " WrittenRows: " << WrittenRows
<< " ChecksumState: " << ChecksumState.ShortDebugString()
<< " }";
}
};
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/datashard/datashard_s3_downloads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ bool TS3DownloadsManager::Load(NIceDb::TNiceDb& db) {
info.WrittenBytes = rowset.GetValueOrDefault<Schema::S3Downloads::WrittenBytes>(0);
info.WrittenRows = rowset.GetValueOrDefault<Schema::S3Downloads::WrittenRows>(0);

if (rowset.HaveValue<Schema::S3Downloads::ChecksumState>()) {
info.ChecksumState = rowset.GetValue<Schema::S3Downloads::ChecksumState>();
}

if (!rowset.Next()) {
ready = false;
break;
Expand Down Expand Up @@ -56,7 +60,9 @@ const TS3Download& TS3DownloadsManager::Store(NIceDb::TNiceDb& db, ui64 txId, co
NIceDb::TUpdate<Schema::S3Downloads::DataETag>(*newInfo.DataETag),
NIceDb::TUpdate<Schema::S3Downloads::ProcessedBytes>(newInfo.ProcessedBytes),
NIceDb::TUpdate<Schema::S3Downloads::WrittenBytes>(newInfo.WrittenBytes),
NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(newInfo.WrittenRows));
NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(newInfo.WrittenRows),
NIceDb::TUpdate<Schema::S3Downloads::ChecksumState>(newInfo.ChecksumState)
);

return info;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/export_s3_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ IEventBase* TS3Buffer::PrepareEvent(bool last, NExportScan::IBuffer::TStats& sta
stats.BytesSent = buffer->Size();

if (Checksum && last) {
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last, Checksum->Serialize());
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last, Checksum->Finalize());
} else {
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last);
}
Expand Down
29 changes: 21 additions & 8 deletions ydb/core/tx/datashard/import_s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,13 +421,21 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
}
}

TChecksumState GetChecksumState() const {
TChecksumState checksumState;
if (Checksum) {
checksumState = Checksum->GetState();
}
return checksumState;
}

void Handle(TEvDataShard::TEvS3DownloadInfo::TPtr& ev) {
IMPORT_LOG_D("Handle " << ev->Get()->ToString());

const auto& info = ev->Get()->Info;
if (!info.DataETag) {
Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(TxId, {
ETag, ProcessedBytes, WrittenBytes, WrittenRows
ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState()
}));
return;
}
Expand All @@ -447,11 +455,15 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
ProcessedBytes = info.ProcessedBytes;
WrittenBytes = info.WrittenBytes;
WrittenRows = info.WrittenRows;
if (Checksum) {
Checksum->Continue(info.ChecksumState);
}

if (!ContentLength || ProcessedBytes >= ContentLength) {
if (CheckChecksum()) {
return Finish();
if (!CheckChecksum()) {
return;
}
return Finish();
}

Process();
Expand Down Expand Up @@ -491,7 +503,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
}

const auto contentLength = result.GetResult().GetContentLength();
const auto checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, CompressionCodec));
const auto checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None));
GetObject(checksumKey, std::make_pair(0, contentLength - 1));
}

Expand Down Expand Up @@ -603,7 +615,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
<< ", size# " << record->ByteSizeLong());

Send(DataShard, new TEvDataShard::TEvS3UploadRowsRequest(TxId, record, {
ETag, ProcessedBytes, WrittenBytes, WrittenRows
ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState()
}));
}

Expand Down Expand Up @@ -703,13 +715,14 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
return true;
}

TString gotChecksum = Checksum->Serialize();
TString gotChecksum = Checksum->Finalize();
if (gotChecksum == ExpectedChecksum) {
return true;
}

const TString error = TStringBuilder() << "Checksum mismatch:"
<< ": expected# " << ExpectedChecksum
const TString error = TStringBuilder() << "Checksum mismatch for "
<< Settings.GetDataKey(DataFormat, ECompressionCodec::None)
<< " expected# " << ExpectedChecksum
<< ", got# " << gotChecksum;

IMPORT_LOG_E(error);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard_export__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
}

exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::S3, settings, domainPath.Base()->PathId, request.GetPeerName());
exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableExportChecksums();
exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableChecksumsExport();
exportInfo->EnablePermissions = AppData()->FeatureFlags.GetEnablePermissionsExport();
TString explain;
if (!FillItems(exportInfo, settings, explain)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> RestorePropose(
restoreSettings.SetRegion(region);
}

if (item.Metadata.HasVersion()) {
task.SetValidateChecksums(item.Metadata.GetVersion() > 0 && !importInfo->Settings.skip_checksum_validation());
if (!item.Metadata.HasVersion() || item.Metadata.GetVersion() > 0) {
task.SetValidateChecksums(!importInfo->Settings.skip_checksum_validation());
}
}
break;
Expand Down
Loading
Loading