Skip to content

Commit d650718

Browse files
authored
25-1: Add saving of the intermediate state of the checksum calculation in import from s3 (#15205)
1 parent 14dd0b0 commit d650718

File tree

21 files changed

+528
-207
lines changed

21 files changed

+528
-207
lines changed

ydb/core/backup/common/checksum.cpp

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,19 @@
44

55
#include <util/string/hex.h>
66

7+
namespace {
8+
9+
template <typename T, size_t N>
10+
void FillArrayFromProto(T (&array)[N], const NProtoBuf::RepeatedField<T>& proto) {
11+
for (int i = 0; i < proto.size(); ++i) {
12+
if (static_cast<size_t>(i) < std::size(array)) {
13+
array[i] = proto.Get(i);
14+
}
15+
}
16+
}
17+
18+
} // anonymous
19+
720
namespace NKikimr::NBackup {
821

922
class TSHA256 : public IChecksum {
@@ -16,20 +29,49 @@ class TSHA256 : public IChecksum {
1629
SHA256_Update(&Context, data.data(), data.size());
1730
}
1831

19-
TString Serialize() override {
32+
TString Finalize() override {
2033
unsigned char hash[SHA256_DIGEST_LENGTH];
2134
SHA256_Final(hash, &Context);
2235
return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH));
2336
}
2437

38+
TChecksumState GetState() const override {
39+
TChecksumState state;
40+
auto& sha256State = *state.MutableSha256State();
41+
42+
for (ui32 h : Context.h) {
43+
sha256State.AddH(h);
44+
}
45+
sha256State.SetNh(Context.Nh);
46+
sha256State.SetNl(Context.Nl);
47+
for (ui32 data : Context.data) {
48+
sha256State.AddData(data);
49+
}
50+
sha256State.SetNum(Context.num);
51+
sha256State.SetMdLen(Context.md_len);
52+
53+
return state;
54+
}
55+
56+
void Continue(const TChecksumState& state) override {
57+
const auto& sha256State = state.GetSha256State();
58+
SHA256_Init(&Context);
59+
FillArrayFromProto(Context.h, sha256State.GetH());
60+
Context.Nh = sha256State.GetNh();
61+
Context.Nl = sha256State.GetNl();
62+
FillArrayFromProto(Context.data, sha256State.GetData());
63+
Context.num = sha256State.GetNum();
64+
Context.md_len = sha256State.GetMdLen();
65+
}
66+
2567
private:
2668
SHA256_CTX Context;
2769
};
2870

2971
TString ComputeChecksum(TStringBuf data) {
3072
IChecksum::TPtr checksum(CreateChecksum());
3173
checksum->AddData(data);
32-
return checksum->Serialize();
74+
return checksum->Finalize();
3375
}
3476

3577
IChecksum* CreateChecksum() {
@@ -40,4 +82,4 @@ TString ChecksumKey(const TString& objKey) {
4082
return objKey + ".sha256";
4183
}
4284

43-
} // NKikimr::NDataShard
85+
} // NKikimr::NBackup

ydb/core/backup/common/checksum.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
#pragma once
22

3+
#include <ydb/core/protos/checksum.pb.h>
4+
35
#include <util/generic/string.h>
46

57
namespace NKikimr::NBackup {
68

9+
using NKikimrBackup::TChecksumState;
10+
711
class IChecksum {
812
public:
913
using TPtr = std::unique_ptr<IChecksum>;
1014

1115
virtual ~IChecksum() = default;
1216

1317
virtual void AddData(TStringBuf data) = 0;
14-
virtual TString Serialize() = 0;
18+
virtual TString Finalize() = 0;
19+
20+
virtual TChecksumState GetState() const = 0;
21+
virtual void Continue(const TChecksumState& state) = 0;
1522
};
1623

1724
IChecksum* CreateChecksum();

ydb/core/backup/common/metadata.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ ui64 TMetadata::GetVersion() const {
2323

2424
TString TMetadata::Serialize() const {
2525
NJson::TJsonMap m;
26-
m["version"] = *Version;
26+
if (Version.Defined()) {
27+
m["version"] = *Version;
28+
}
2729

2830
NJson::TJsonArray fullBackups;
2931
for (auto &[tp, b] : FullBackups) {

ydb/core/protos/checksum.proto

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package NKikimrBackup;
2+
option java_package = "ru.yandex.kikimr.proto";
3+
4+
// Corresponds to the OpenSSL SHA256_CTX structure.
5+
message TSha256State {
6+
repeated uint32 H = 1;
7+
optional uint32 Nh = 2;
8+
optional uint32 Nl = 3;
9+
repeated uint32 Data = 4;
10+
optional uint32 Num = 5;
11+
optional uint32 MdLen = 6;
12+
}
13+
14+
// Used to serialize the intermediate state of a checksum.
15+
message TChecksumState {
16+
oneof state {
17+
TSha256State Sha256State = 1;
18+
}
19+
}

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ message TFeatureFlags {
184184
optional bool EnableDataShardInMemoryStateMigration = 159 [default = true];
185185
optional bool EnableDataShardInMemoryStateMigrationAcrossGenerations = 160 [default = false];
186186
optional bool DisableLocalDBEraseCache = 161 [default = false];
187-
optional bool EnableExportChecksums = 162 [default = false];
187+
optional bool EnableChecksumsExport = 162 [default = false];
188188
optional bool EnableTopicTransfer = 163 [default = false];
189189
optional bool EnableViewExport = 164 [default = false];
190190
optional bool EnableColumnStore = 165 [default = false];

ydb/core/protos/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ SRCS(
3131
bootstrapper.proto
3232
change_exchange.proto
3333
channel_purpose.proto
34+
checksum.proto
3435
cms.proto
3536
compaction.proto
3637
compile_service_config.proto

ydb/core/testlib/basics/feature_flags.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class TTestFeatureFlagsHolder {
7171
FEATURE_FLAG_SETTER(EnableParameterizedDecimal)
7272
FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForCDC)
7373
FEATURE_FLAG_SETTER(EnableFollowerStats)
74-
FEATURE_FLAG_SETTER(EnableExportChecksums)
74+
FEATURE_FLAG_SETTER(EnableChecksumsExport)
7575
FEATURE_FLAG_SETTER(EnableTopicTransfer)
7676
FEATURE_FLAG_SETTER(EnableStrictUserManagement)
7777
FEATURE_FLAG_SETTER(EnableDatabaseAdmin)

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#include <ydb/core/protos/tx.pb.h>
5353
#include <ydb/core/protos/tx_datashard.pb.h>
5454
#include <ydb/core/protos/subdomains.pb.h>
55+
#include <ydb/core/protos/checksum.pb.h>
5556
#include <ydb/core/protos/counters_datashard.pb.h>
5657
#include <ydb/core/protos/table_stats.pb.h>
5758

@@ -789,9 +790,18 @@ class TDataShard
789790
struct ProcessedBytes : Column<4, NScheme::NTypeIds::Uint64> {};
790791
struct WrittenBytes : Column<5, NScheme::NTypeIds::Uint64> {};
791792
struct WrittenRows : Column<6, NScheme::NTypeIds::Uint64> {};
793+
struct ChecksumState : Column<7, NScheme::NTypeIds::String> { using Type = NKikimrBackup::TChecksumState; };
792794

793795
using TKey = TableKey<TxId>;
794-
using TColumns = TableColumns<TxId, SchemeETag, DataETag, ProcessedBytes, WrittenBytes, WrittenRows>;
796+
using TColumns = TableColumns<
797+
TxId,
798+
SchemeETag,
799+
DataETag,
800+
ProcessedBytes,
801+
WrittenBytes,
802+
WrittenRows,
803+
ChecksumState
804+
>;
795805
};
796806

797807
struct ChangeRecords : Table<17> {

ydb/core/tx/datashard/datashard_s3_download.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22

3+
#include <ydb/core/protos/checksum.pb.h>
4+
35
#include <util/generic/maybe.h>
46

57
namespace NKikimr {
@@ -10,13 +12,15 @@ struct TS3Download {
1012
ui64 ProcessedBytes = 0;
1113
ui64 WrittenBytes = 0;
1214
ui64 WrittenRows = 0;
15+
NKikimrBackup::TChecksumState ChecksumState;
1316

1417
void Out(IOutputStream& out) const {
1518
out << "{"
1619
<< " DataETag: " << DataETag
1720
<< " ProcessedBytes: " << ProcessedBytes
1821
<< " WrittenBytes: " << WrittenBytes
1922
<< " WrittenRows: " << WrittenRows
23+
<< " ChecksumState: " << ChecksumState.ShortDebugString()
2024
<< " }";
2125
}
2226
};

ydb/core/tx/datashard/datashard_s3_downloads.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ bool TS3DownloadsManager::Load(NIceDb::TNiceDb& db) {
2424
info.WrittenBytes = rowset.GetValueOrDefault<Schema::S3Downloads::WrittenBytes>(0);
2525
info.WrittenRows = rowset.GetValueOrDefault<Schema::S3Downloads::WrittenRows>(0);
2626

27+
if (rowset.HaveValue<Schema::S3Downloads::ChecksumState>()) {
28+
info.ChecksumState = rowset.GetValue<Schema::S3Downloads::ChecksumState>();
29+
}
30+
2731
if (!rowset.Next()) {
2832
ready = false;
2933
break;
@@ -56,7 +60,9 @@ const TS3Download& TS3DownloadsManager::Store(NIceDb::TNiceDb& db, ui64 txId, co
5660
NIceDb::TUpdate<Schema::S3Downloads::DataETag>(*newInfo.DataETag),
5761
NIceDb::TUpdate<Schema::S3Downloads::ProcessedBytes>(newInfo.ProcessedBytes),
5862
NIceDb::TUpdate<Schema::S3Downloads::WrittenBytes>(newInfo.WrittenBytes),
59-
NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(newInfo.WrittenRows));
63+
NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(newInfo.WrittenRows),
64+
NIceDb::TUpdate<Schema::S3Downloads::ChecksumState>(newInfo.ChecksumState)
65+
);
6066

6167
return info;
6268
}

ydb/core/tx/datashard/export_s3_buffer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ IEventBase* TS3Buffer::PrepareEvent(bool last, NExportScan::IBuffer::TStats& sta
297297
stats.BytesSent = buffer->Size();
298298

299299
if (Checksum && last) {
300-
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last, Checksum->Serialize());
300+
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last, Checksum->Finalize());
301301
} else {
302302
return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last);
303303
}

ydb/core/tx/datashard/import_s3.cpp

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -421,13 +421,21 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
421421
}
422422
}
423423

424+
TChecksumState GetChecksumState() const {
425+
TChecksumState checksumState;
426+
if (Checksum) {
427+
checksumState = Checksum->GetState();
428+
}
429+
return checksumState;
430+
}
431+
424432
void Handle(TEvDataShard::TEvS3DownloadInfo::TPtr& ev) {
425433
IMPORT_LOG_D("Handle " << ev->Get()->ToString());
426434

427435
const auto& info = ev->Get()->Info;
428436
if (!info.DataETag) {
429437
Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(TxId, {
430-
ETag, ProcessedBytes, WrittenBytes, WrittenRows
438+
ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState()
431439
}));
432440
return;
433441
}
@@ -447,11 +455,15 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
447455
ProcessedBytes = info.ProcessedBytes;
448456
WrittenBytes = info.WrittenBytes;
449457
WrittenRows = info.WrittenRows;
458+
if (Checksum) {
459+
Checksum->Continue(info.ChecksumState);
460+
}
450461

451462
if (!ContentLength || ProcessedBytes >= ContentLength) {
452-
if (CheckChecksum()) {
453-
return Finish();
463+
if (!CheckChecksum()) {
464+
return;
454465
}
466+
return Finish();
455467
}
456468

457469
Process();
@@ -491,7 +503,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
491503
}
492504

493505
const auto contentLength = result.GetResult().GetContentLength();
494-
const auto checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, CompressionCodec));
506+
const auto checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None));
495507
GetObject(checksumKey, std::make_pair(0, contentLength - 1));
496508
}
497509

@@ -603,7 +615,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
603615
<< ", size# " << record->ByteSizeLong());
604616

605617
Send(DataShard, new TEvDataShard::TEvS3UploadRowsRequest(TxId, record, {
606-
ETag, ProcessedBytes, WrittenBytes, WrittenRows
618+
ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState()
607619
}));
608620
}
609621

@@ -703,13 +715,14 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
703715
return true;
704716
}
705717

706-
TString gotChecksum = Checksum->Serialize();
718+
TString gotChecksum = Checksum->Finalize();
707719
if (gotChecksum == ExpectedChecksum) {
708720
return true;
709721
}
710722

711-
const TString error = TStringBuilder() << "Checksum mismatch:"
712-
<< ": expected# " << ExpectedChecksum
723+
const TString error = TStringBuilder() << "Checksum mismatch for "
724+
<< Settings.GetDataKey(DataFormat, ECompressionCodec::None)
725+
<< " expected# " << ExpectedChecksum
713726
<< ", got# " << gotChecksum;
714727

715728
IMPORT_LOG_E(error);

ydb/core/tx/schemeshard/schemeshard_export__create.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
129129
}
130130

131131
exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::S3, settings, domainPath.Base()->PathId, request.GetPeerName());
132-
exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableExportChecksums();
132+
exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableChecksumsExport();
133133
exportInfo->EnablePermissions = AppData()->FeatureFlags.GetEnablePermissionsExport();
134134
TString explain;
135135
if (!FillItems(exportInfo, settings, explain)) {

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,8 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> RestorePropose(
170170
restoreSettings.SetRegion(region);
171171
}
172172

173-
if (item.Metadata.HasVersion()) {
174-
task.SetValidateChecksums(item.Metadata.GetVersion() > 0 && !importInfo->Settings.skip_checksum_validation());
173+
if (!item.Metadata.HasVersion() || item.Metadata.GetVersion() > 0) {
174+
task.SetValidateChecksums(!importInfo->Settings.skip_checksum_validation());
175175
}
176176
}
177177
break;

0 commit comments

Comments
 (0)