Skip to content

Commit 7e96bc9

Browse files
author
azevaykin
committed
Don't abort when export corrupted data
1 parent f5da486 commit 7e96bc9

File tree

10 files changed

+139
-55
lines changed

10 files changed

+139
-55
lines changed

ydb/core/scheme/scheme_tablecell.h

+13-3
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,19 @@ struct TCell {
101101
return ReadUnaligned<T>(Data());
102102
}
103103

104-
template<typename T, typename = TStdLayout<T>>
105-
static inline TCell Make(const T &val) noexcept
106-
{
104+
template <typename T, typename = TStdLayout<T>>
105+
bool ToStream(IOutputStream& out, TStringBuilder& err) const noexcept {
106+
if(sizeof(T) != Size()) {
107+
err << "AsValue<T>() type doesn't match TCell";
108+
return false;
109+
}
110+
111+
out << ReadUnaligned<T>(Data());
112+
return true;
113+
}
114+
115+
template <typename T, typename = TStdLayout<T>>
116+
static inline TCell Make(const T& val) noexcept {
107117
auto *ptr = static_cast<const char*>(static_cast<const void*>(&val));
108118

109119
return TCell{ ptr, sizeof(val) };

ydb/core/tx/datashard/export_common.cpp

+26-2
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,33 @@ TString DecimalToString(const std::pair<ui64, i64>& loHi) {
7777
}
7878

7979
TString DyNumberToString(TStringBuf data) {
80+
TString result;
81+
TStringOutput out(result);
82+
TStringBuilder err;
83+
84+
bool success = DyNumberToStream(data, out, err);
85+
Y_ABORT_UNLESS(success);
86+
87+
return result;
88+
}
89+
90+
bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TStringBuilder& err) {
91+
Y_UNUSED(err);
92+
using namespace NYql::NDecimal;
93+
94+
TInt128 val = FromHalfs(loHi.first, loHi.second);
95+
out << ToString(val, NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE);
96+
return true;
97+
}
98+
99+
bool DyNumberToStream(TStringBuf data, IOutputStream& out, TStringBuilder& err) {
80100
auto result = NDyNumber::DyNumberToString(data);
81-
Y_ABORT_UNLESS(result.Defined(), "Invalid DyNumber binary representation");
82-
return *result;
101+
if (!result.Defined()) {
102+
err << "Invalid DyNumber binary representation";
103+
return false;
104+
}
105+
out << *result;
106+
return true;
83107
}
84108

85109
} // NDataShard

ydb/core/tx/datashard/export_common.h

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ TMaybe<Ydb::Table::CreateTableRequest> GenYdbScheme(
3636

3737
TString DecimalToString(const std::pair<ui64, i64>& loHi);
3838
TString DyNumberToString(TStringBuf data);
39+
bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TStringBuilder& err);
40+
bool DyNumberToStream(TStringBuf data, IOutputStream& out, TStringBuilder& err);
3941

4042
} // NDataShard
4143
} // NKikimr

ydb/core/tx/datashard/export_s3_buffer_raw.cpp

+23-16
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ void TS3BufferRaw::ColumnsOrder(const TVector<ui32>& tags) {
3636
}
3737
}
3838

39-
void TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
39+
bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
4040
bool needsComma = false;
4141
for (const auto& [tag, column] : Columns) {
4242
auto it = Indices.find(tag);
@@ -57,18 +57,19 @@ void TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
5757
continue;
5858
}
5959

60+
bool serialized = true;
6061
switch (column.Type.GetTypeId()) {
6162
case NScheme::NTypeIds::Int32:
62-
out << cell.AsValue<i32>();
63+
serialized = cell.ToStream<i32>(out, ErrorString);
6364
break;
6465
case NScheme::NTypeIds::Uint32:
65-
out << cell.AsValue<ui32>();
66+
serialized = cell.ToStream<ui32>(out, ErrorString);
6667
break;
6768
case NScheme::NTypeIds::Int64:
68-
out << cell.AsValue<i64>();
69+
serialized = cell.ToStream<i64>(out, ErrorString);
6970
break;
7071
case NScheme::NTypeIds::Uint64:
71-
out << cell.AsValue<ui64>();
72+
serialized = cell.ToStream<ui64>(out, ErrorString);
7273
break;
7374
case NScheme::NTypeIds::Uint8:
7475
//case NScheme::NTypeIds::Byte:
@@ -78,19 +79,19 @@ void TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
7879
out << static_cast<i32>(cell.AsValue<i8>());
7980
break;
8081
case NScheme::NTypeIds::Int16:
81-
out << cell.AsValue<i16>();
82+
serialized = cell.ToStream<i16>(out, ErrorString);
8283
break;
8384
case NScheme::NTypeIds::Uint16:
84-
out << cell.AsValue<ui16>();
85+
serialized = cell.ToStream<ui16>(out, ErrorString);
8586
break;
8687
case NScheme::NTypeIds::Bool:
87-
out << cell.AsValue<bool>();
88+
serialized = cell.ToStream<bool>(out, ErrorString);
8889
break;
8990
case NScheme::NTypeIds::Double:
90-
out << cell.AsValue<double>();
91+
serialized = cell.ToStream<double>(out, ErrorString);
9192
break;
9293
case NScheme::NTypeIds::Float:
93-
out << cell.AsValue<float>();
94+
serialized = cell.ToStream<float>(out, ErrorString);
9495
break;
9596
case NScheme::NTypeIds::Date:
9697
out << TInstant::Days(cell.AsValue<ui16>());
@@ -102,13 +103,13 @@ void TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
102103
out << TInstant::MicroSeconds(cell.AsValue<ui64>());
103104
break;
104105
case NScheme::NTypeIds::Interval:
105-
out << cell.AsValue<i64>();
106+
serialized = cell.ToStream<i64>(out, ErrorString);
106107
break;
107108
case NScheme::NTypeIds::Decimal:
108-
out << DecimalToString(cell.AsValue<std::pair<ui64, i64>>());
109+
serialized = DecimalToStream(cell.AsValue<std::pair<ui64, i64>>(), out, ErrorString);
109110
break;
110111
case NScheme::NTypeIds::DyNumber:
111-
out << DyNumberToString(cell.AsBuf());
112+
serialized = DyNumberToStream(cell.AsBuf(), out, ErrorString);
112113
break;
113114
case NScheme::NTypeIds::String:
114115
case NScheme::NTypeIds::String4k:
@@ -127,16 +128,22 @@ void TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
127128
default:
128129
Y_ABORT("Unsupported type");
129130
}
131+
132+
if (!serialized) {
133+
return false;
134+
}
130135
}
131136

132137
out << "\n";
133138
++Rows;
139+
140+
return true;
134141
}
135142

136143
bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) {
137144
TBufferOutput out(Buffer);
138-
Collect(row, out);
139-
return true;
145+
ErrorString.clear();
146+
return Collect(row, out);
140147
}
141148

142149
IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) {
@@ -161,7 +168,7 @@ bool TS3BufferRaw::IsFilled() const {
161168
}
162169

163170
TString TS3BufferRaw::GetError() const {
164-
Y_ABORT("unreachable");
171+
return ErrorString;
165172
}
166173

167174
TMaybe<TBuffer> TS3BufferRaw::Flush(bool) {

ydb/core/tx/datashard/export_s3_buffer_raw.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class TS3BufferRaw: public NExportScan::IBuffer {
2727
inline ui64 GetRowsLimit() const { return RowsLimit; }
2828
inline ui64 GetBytesLimit() const { return BytesLimit; }
2929

30-
void Collect(const NTable::IScan::TRow& row, IOutputStream& out);
30+
bool Collect(const NTable::IScan::TRow& row, IOutputStream& out);
3131
virtual TMaybe<TBuffer> Flush(bool prepare);
3232

3333
private:
@@ -42,6 +42,7 @@ class TS3BufferRaw: public NExportScan::IBuffer {
4242
ui64 BytesRead;
4343
TBuffer Buffer;
4444

45+
TStringBuilder ErrorString;
4546
}; // TS3BufferRaw
4647

4748
} // NDataShard

ydb/core/tx/datashard/export_s3_buffer_zstd.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ class TS3BufferZstd: public TS3BufferRaw {
6262
bool Collect(const NTable::IScan::TRow& row) override {
6363
BufferRaw.clear();
6464
TStringOutput out(BufferRaw);
65-
TS3BufferRaw::Collect(row, out);
65+
if (!TS3BufferRaw::Collect(row, out)) {
66+
return false;
67+
}
6668

6769
BytesRaw += BufferRaw.size();
6870

ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp

-32
Original file line numberDiff line numberDiff line change
@@ -570,38 +570,6 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
570570
}
571571
}
572572

573-
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx,
574-
const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds)
575-
{
576-
auto tableDesc = DescribePath(runtime, tablePath, true, true);
577-
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
578-
UNIT_ASSERT(partitionIdx < tablePartitions.size());
579-
580-
auto ev = MakeHolder<TEvDataShard::TEvUploadRowsRequest>();
581-
ev->Record.SetTableId(tableDesc.GetPathId());
582-
583-
auto& scheme = *ev->Record.MutableRowScheme();
584-
for (ui32 tag : keyTags) {
585-
scheme.AddKeyColumnIds(tag);
586-
}
587-
for (ui32 tag : valueTags) {
588-
scheme.AddValueColumnIds(tag);
589-
}
590-
591-
for (ui32 i : recordIds) {
592-
auto key = TVector<TCell>{TCell::Make(i)};
593-
auto value = TVector<TCell>{TCell::Make(i)};
594-
595-
auto& row = *ev->Record.AddRows();
596-
row.SetKeyColumns(TSerializedCellVec::Serialize(key));
597-
row.SetValueColumns(TSerializedCellVec::Serialize(value));
598-
}
599-
600-
const auto& sender = runtime.AllocateEdgeActor();
601-
ForwardToTablet(runtime, tablePartitions[partitionIdx].GetDatashardId(), sender, ev.Release());
602-
runtime.GrabEdgeEvent<TEvDataShard::TEvUploadRowsResponse>(sender);
603-
}
604-
605573
Y_UNIT_TEST(SplitTable) {
606574
TTestWithReboots t;
607575
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {

ydb/core/tx/schemeshard/ut_export/ut_export.cpp

+37
Original file line numberDiff line numberDiff line change
@@ -1372,4 +1372,41 @@ partitioning_settings {
13721372
env.TestWaitNotification(runtime, exportId);
13731373
TestGetExport(runtime, exportId, "/MyRoot");
13741374
}
1375+
1376+
Y_UNIT_TEST(CorruptedBadDyNumber) {
1377+
TTestBasicRuntime runtime;
1378+
TTestEnv env(runtime, TTestEnvOptions().DisableStatsBatching(true));
1379+
ui64 txId = 100;
1380+
1381+
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
1382+
Name: "Table"
1383+
Columns { Name: "key" Type: "Utf8" }
1384+
Columns { Name: "value" Type: "DyNumber" }
1385+
KeyColumnNames: ["key"]
1386+
)");
1387+
env.TestWaitNotification(runtime, txId);
1388+
1389+
// Write bad DyNumber
1390+
UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1});
1391+
1392+
TPortManager portManager;
1393+
const ui16 port = portManager.GetPort();
1394+
1395+
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
1396+
UNIT_ASSERT(s3Mock.Start());
1397+
1398+
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
1399+
ExportToS3Settings {
1400+
endpoint: "localhost:%d"
1401+
scheme: HTTP
1402+
items {
1403+
source_path: "/MyRoot/Table"
1404+
destination_prefix: ""
1405+
}
1406+
}
1407+
)", port));
1408+
env.TestWaitNotification(runtime, txId);
1409+
1410+
TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED);
1411+
}
13751412
}

ydb/core/tx/schemeshard/ut_helpers/helpers.cpp

+31
Original file line numberDiff line numberDiff line change
@@ -2258,4 +2258,35 @@ namespace NSchemeShardUT_Private {
22582258
data.push_back({1, message});
22592259
NKikimr::NPQ::CmdWrite(&runtime, tabletId, edge, partitionId, "sourceid0", msgSeqNo, data, false, {}, true, cookie, 0);
22602260
}
2261+
2262+
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds)
2263+
{
2264+
auto tableDesc = DescribePath(runtime, tablePath, true, true);
2265+
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
2266+
UNIT_ASSERT(partitionIdx < tablePartitions.size());
2267+
2268+
auto ev = MakeHolder<TEvDataShard::TEvUploadRowsRequest>();
2269+
ev->Record.SetTableId(tableDesc.GetPathId());
2270+
2271+
auto& scheme = *ev->Record.MutableRowScheme();
2272+
for (ui32 tag : keyTags) {
2273+
scheme.AddKeyColumnIds(tag);
2274+
}
2275+
for (ui32 tag : valueTags) {
2276+
scheme.AddValueColumnIds(tag);
2277+
}
2278+
2279+
for (ui32 i : recordIds) {
2280+
auto key = TVector<TCell>{TCell::Make(i)};
2281+
auto value = TVector<TCell>{TCell::Make(i)};
2282+
2283+
auto& row = *ev->Record.AddRows();
2284+
row.SetKeyColumns(TSerializedCellVec::Serialize(key));
2285+
row.SetValueColumns(TSerializedCellVec::Serialize(value));
2286+
}
2287+
2288+
const auto& sender = runtime.AllocateEdgeActor();
2289+
ForwardToTablet(runtime, tablePartitions[partitionIdx].GetDatashardId(), sender, ev.Release());
2290+
runtime.GrabEdgeEvent<TEvDataShard::TEvUploadRowsResponse>(sender);
2291+
}
22612292
}

ydb/core/tx/schemeshard/ut_helpers/helpers.h

+2
Original file line numberDiff line numberDiff line change
@@ -542,4 +542,6 @@ namespace NSchemeShardUT_Private {
542542
void SendTEvPeriodicTopicStats(TTestActorRuntime& runtime, ui64 topicId, ui64 generation, ui64 round, ui64 dataSize, ui64 usedReserveSize);
543543
void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message);
544544

545+
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds);
546+
545547
} //NSchemeShardUT_Private

0 commit comments

Comments
 (0)