Skip to content

Commit 67fc041

Browse files
authored
allow parsing out-of-range doubles in bulk-upsert (#15376)
1 parent 7c72123 commit 67fc041

File tree

9 files changed

+99
-25
lines changed

9 files changed

+99
-25
lines changed

ydb/core/formats/arrow/converter.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
namespace NKikimr::NArrow {
1919

20-
static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryPool& memPool, TString& errorMessage) {
20+
static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryPool& memPool, TString& errorMessage, const bool allowInfDouble) {
2121
if (!cell.AsBuf()) {
2222
cell = TCell();
2323
return true;
@@ -34,7 +34,7 @@ static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryP
3434
break;
3535
}
3636
case NScheme::NTypeIds::JsonDocument: {
37-
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(cell.AsBuf());
37+
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(cell.AsBuf(), allowInfDouble);
3838
if (std::holds_alternative<TString>(binaryJson)) {
3939
errorMessage = "Invalid JSON for JsonDocument provided: " + std::get<TString>(binaryJson);
4040
return false;
@@ -50,7 +50,8 @@ static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryP
5050
return true;
5151
}
5252

53-
static arrow::Status ConvertColumn(const NScheme::TTypeInfo colType, std::shared_ptr<arrow::Array>& column, std::shared_ptr<arrow::Field>& field) {
53+
static arrow::Status ConvertColumn(
54+
const NScheme::TTypeInfo colType, std::shared_ptr<arrow::Array>& column, std::shared_ptr<arrow::Field>& field, const bool allowInfDouble) {
5455
switch (colType.GetTypeId()) {
5556
case NScheme::NTypeIds::Decimal:
5657
return arrow::Status::OK();
@@ -102,7 +103,7 @@ static arrow::Status ConvertColumn(const NScheme::TTypeInfo colType, std::shared
102103
return appendResult;
103104
}
104105
} else {
105-
const auto maybeBinaryJson = NBinaryJson::SerializeToBinaryJson(valueBuf);
106+
const auto maybeBinaryJson = NBinaryJson::SerializeToBinaryJson(valueBuf, allowInfDouble);
106107
if (std::holds_alternative<TString>(maybeBinaryJson)) {
107108
return arrow::Status::SerializationError("Cannot serialize json (", std::get<TString>(maybeBinaryJson),
108109
"): ", valueBuf.SubStr(0, Min(valueBuf.Size(), size_t{1024})));
@@ -134,17 +135,16 @@ static arrow::Status ConvertColumn(const NScheme::TTypeInfo colType, std::shared
134135
return arrow::Status::OK();
135136
}
136137

137-
arrow::Result<std::shared_ptr<arrow::RecordBatch>> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
138-
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert)
139-
{
138+
arrow::Result<std::shared_ptr<arrow::RecordBatch>> ConvertColumns(
139+
const std::shared_ptr<arrow::RecordBatch>& batch, const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert, const bool allowInfDouble) {
140140
std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns();
141141
std::vector<std::shared_ptr<arrow::Field>> fields = batch->schema()->fields();
142142
Y_ABORT_UNLESS(columns.size() == fields.size());
143143
for (i32 i = 0; i < batch->num_columns(); ++i) {
144144
auto& colName = batch->column_name(i);
145145
auto it = columnsToConvert.find(TString(colName.data(), colName.size()));
146146
if (it != columnsToConvert.end()) {
147-
auto convertResult = ConvertColumn(it->second, columns[i], fields[i]);
147+
auto convertResult = ConvertColumn(it->second, columns[i], fields[i], allowInfDouble);
148148
if (!convertResult.ok()) {
149149
return arrow::Status::FromArgs(convertResult.code(), "column ", colName, ": ", convertResult.ToString());
150150
}
@@ -326,7 +326,7 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err
326326

327327
if (NeedDataConversion(colType)) {
328328
for (i32 i = 0; i < unroll; ++i) {
329-
if (!ConvertData(cells[i][col], colType, memPool, errorMessage)) {
329+
if (!ConvertData(cells[i][col], colType, memPool, errorMessage, AllowInfDouble_)) {
330330
return false;
331331
}
332332
}
@@ -371,7 +371,7 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err
371371
return false;
372372
}
373373

374-
if (!ConvertData(curCell, colType, memPool, errorMessage)) {
374+
if (!ConvertData(curCell, colType, memPool, errorMessage, AllowInfDouble_)) {
375375
return false;
376376
}
377377
++col;

ydb/core/formats/arrow/converter.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class TArrowToYdbConverter {
2222
private:
2323
std::vector<std::pair<TString, NScheme::TTypeInfo>> YdbSchema_; // Destination schema (allow shrink and reorder)
2424
IRowWriter& RowWriter_;
25+
bool AllowInfDouble_;
2526

2627
template <typename TArray>
2728
TCell MakeCellFromValue(const std::shared_ptr<arrow::Array>& column, i64 row) {
@@ -63,17 +64,19 @@ class TArrowToYdbConverter {
6364

6465
static bool NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType);
6566

66-
TArrowToYdbConverter(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter)
67+
TArrowToYdbConverter(
68+
const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter, const bool allowInfDouble = false)
6769
: YdbSchema_(ydbSchema)
6870
, RowWriter_(rowWriter)
69-
{}
71+
, AllowInfDouble_(allowInfDouble) {
72+
}
7073

7174
bool Process(const arrow::RecordBatch& batch, TString& errorMessage);
7275
};
7376

7477
arrow::Result<std::shared_ptr<arrow::RecordBatch>> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
75-
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert);
78+
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert, const bool allowInfDouble = false);
7679
arrow::Result<std::shared_ptr<arrow::RecordBatch>> InplaceConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
77-
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert);
80+
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert);
7881

7982
} // namespace NKikimr::NArrow

ydb/core/grpc_services/rpc_load_rows.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::T
252252
}
253253

254254
// Fill rest of cells with non-key column members
255-
if (!FillCellsFromProto(valueCells, ValueColumnPositions, r, errorMessage, valueDataPool)) {
255+
if (!FillCellsFromProto(valueCells, ValueColumnPositions, r, errorMessage, valueDataPool, IsInfinityInJsonAllowed())) {
256256
return false;
257257
}
258258

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
126126
AppConfig.MutableColumnShardConfig()->SetAlterObjectEnabled(enable);
127127
return *this;
128128
}
129+
TKikimrSettings& SetColumnShardDoubleOutOfRangeHandling(const NKikimrConfig::TColumnShardConfig_EJsonDoubleOutOfRangeHandlingPolicy value) {
130+
AppConfig.MutableColumnShardConfig()->SetDoubleOutOfRangeHandling(value);
131+
return *this;
132+
}
129133
};
130134

131135
class TKikimrRunner {

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3206,6 +3206,54 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
32063206
}
32073207
}
32083208

3209-
}
3209+
Y_UNIT_TEST(DoubleOutOfRangeInJson) {
3210+
auto settings = TKikimrSettings().SetWithSampleTables(false).SetColumnShardDoubleOutOfRangeHandling(
3211+
NKikimrConfig::TColumnShardConfig_EJsonDoubleOutOfRangeHandlingPolicy_CAST_TO_INFINITY);
3212+
TKikimrRunner kikimr(settings);
3213+
TLocalHelper(kikimr).CreateTestOlapTable();
3214+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2);
3215+
auto client = kikimr.GetTableClient();
3216+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
32103217

3218+
{
3219+
auto result = kikimr.GetQueryClient()
3220+
.ExecuteQuery(R"(
3221+
CREATE TABLE olapTable (
3222+
k Uint32 NOT NULL,
3223+
v JsonDocument NOT NULL,
3224+
PRIMARY KEY (k)
3225+
)
3226+
WITH (
3227+
STORE = COLUMN,
3228+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 4
3229+
)
3230+
)",
3231+
NQuery::TTxControl::NoTx())
3232+
.ExtractValueSync();
3233+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
3234+
}
3235+
3236+
{
3237+
TValueBuilder rowsBuilder;
3238+
rowsBuilder.BeginList();
3239+
for (ui32 i = 0; i < 10; ++i) {
3240+
rowsBuilder.AddListItem().BeginStruct().AddMember("k").Uint32(i * 4 + 0).AddMember("v").JsonDocument("-1.797693135e+308").EndStruct();
3241+
rowsBuilder.AddListItem().BeginStruct().AddMember("k").Uint32(i * 4 + 1).AddMember("v").JsonDocument("1.797693135e+308").EndStruct();
3242+
rowsBuilder.AddListItem().BeginStruct().AddMember("k").Uint32(i * 4 + 2).AddMember("v").JsonDocument("1e1000000000000").EndStruct();
3243+
rowsBuilder.AddListItem().BeginStruct().AddMember("k").Uint32(i * 4 + 3).AddMember("v").JsonDocument("-1e1000000000000").EndStruct();
3244+
}
3245+
rowsBuilder.EndList();
3246+
auto result = client.BulkUpsert("/Root/olapTable", rowsBuilder.Build()).ExtractValueSync();
3247+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
3248+
}
3249+
3250+
{
3251+
auto it = client.StreamExecuteScanQuery("SELECT * FROM olapTable WHERE k < 4 ORDER BY k").ExtractValueSync();
3252+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
3253+
TString result = StreamResultToYson(it);
3254+
Cout << result << Endl;
3255+
CompareYson(result, R"([[0u;"\"-inf\""];[1u;"\"inf\""];[2u;"\"inf\""];[3u;"\"-inf\""]])");
3256+
}
3257+
}
3258+
}
32113259
}

ydb/core/protos/config.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1822,6 +1822,11 @@ message TColumnShardConfig {
18221822
}
18231823
repeated TRepairInfo Repairs = 15;
18241824

1825+
enum EJsonDoubleOutOfRangeHandlingPolicy {
1826+
REJECT = 0;
1827+
CAST_TO_INFINITY = 1;
1828+
}
1829+
18251830
optional uint32 MaxInFlightIntervalsOnRequest = 16;
18261831
optional uint32 MaxReadStaleness_ms = 18 [default = 300000];
18271832
optional uint32 GCIntervalMs = 19 [default = 30000];
@@ -1845,6 +1850,7 @@ message TColumnShardConfig {
18451850
optional int32 DefaultCompressionLevel = 37;
18461851
optional uint64 MemoryLimitMergeOnCompactionRawData = 38 [default = 512000000];
18471852
optional bool AlterObjectEnabled = 39 [default = false];
1853+
optional EJsonDoubleOutOfRangeHandlingPolicy DoubleOutOfRangeHandling = 40 [default = REJECT];
18481854
}
18491855

18501856
message TSchemeShardConfig {

ydb/core/tx/tx_proxy/upload_rows_common_impl.h

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <ydb/core/base/tablet_pipecache.h>
1111
#include <ydb/core/base/path.h>
1212
#include <ydb/core/base/feature_flags.h>
13+
#include <ydb/core/protos/config.pb.h>
1314
#include <ydb/core/scheme/scheme_tablecell.h>
1415
#include <ydb/core/scheme/scheme_type_info.h>
1516
#include <ydb/core/scheme/scheme_types_proto.h>
@@ -267,7 +268,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
267268

268269
ui32 keySize = KeyColumnPositions.size(); // YdbSchema contains keys first
269270
TRowWriter writer(out, keySize);
270-
NArrow::TArrowToYdbConverter batchConverter(YdbSchema, writer);
271+
NArrow::TArrowToYdbConverter batchConverter(YdbSchema, writer, IsInfinityInJsonAllowed());
271272
if (!batchConverter.Process(*batch, errorMessage)) {
272273
return {};
273274
}
@@ -276,6 +277,18 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
276277
return out;
277278
}
278279

280+
bool IsInfinityInJsonAllowed() const {
281+
if (TableKind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
282+
return false;
283+
}
284+
switch (AppDataVerified().ColumnShardConfig.GetDoubleOutOfRangeHandling()) {
285+
case NKikimrConfig::TColumnShardConfig_EJsonDoubleOutOfRangeHandlingPolicy_REJECT:
286+
return false;
287+
case NKikimrConfig::TColumnShardConfig_EJsonDoubleOutOfRangeHandlingPolicy_CAST_TO_INFINITY:
288+
return true;
289+
}
290+
}
291+
279292
private:
280293
virtual void OnBeforeStart(const TActorContext&) {
281294
// nothing by default
@@ -658,7 +671,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
658671
}
659672
// Explicit types conversion
660673
if (!ColumnsToConvert.empty()) {
661-
auto convertResult = NArrow::ConvertColumns(Batch, ColumnsToConvert);
674+
auto convertResult = NArrow::ConvertColumns(Batch, ColumnsToConvert, IsInfinityInJsonAllowed());
662675
if (!convertResult.ok()) {
663676
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST,
664677
TStringBuilder() << "Cannot convert arrow batch:" << convertResult.status().ToString(), ctx);
@@ -1238,9 +1251,8 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
12381251
using TFieldDescription = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>::TFieldDescription;
12391252

12401253
template <class TProto>
1241-
inline bool FillCellsFromProto(TVector<TCell>& cells, const TVector<TFieldDescription>& descr, const TProto& proto,
1242-
TString& err, TMemoryPool& valueDataPool)
1243-
{
1254+
inline bool FillCellsFromProto(TVector<TCell>& cells, const TVector<TFieldDescription>& descr, const TProto& proto, TString& err,
1255+
TMemoryPool& valueDataPool, const bool allowInfDouble = false) {
12441256
cells.clear();
12451257
cells.reserve(descr.size());
12461258

@@ -1250,7 +1262,8 @@ inline bool FillCellsFromProto(TVector<TCell>& cells, const TVector<TFieldDescri
12501262
return false;
12511263
}
12521264
cells.push_back({});
1253-
if (!CellFromProtoVal(fd.Type, fd.Typmod, &proto.Getitems(fd.PositionInStruct), false, cells.back(), err, valueDataPool)) {
1265+
if (!CellFromProtoVal(
1266+
fd.Type, fd.Typmod, &proto.Getitems(fd.PositionInStruct), false, cells.back(), err, valueDataPool, allowInfDouble)) {
12541267
return false;
12551268
}
12561269

ydb/core/ydb_convert/ydb_convert.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,7 +1192,7 @@ bool CheckValueData(NScheme::TTypeInfo type, const TCell& cell, TString& err) {
11921192
}
11931193

11941194
bool CellFromProtoVal(const NScheme::TTypeInfo& type, i32 typmod, const Ydb::Value* vp, bool allowCastFromString,
1195-
TCell& c, TString& err, TMemoryPool& valueDataPool)
1195+
TCell& c, TString& err, TMemoryPool& valueDataPool, bool allowInfDouble)
11961196
{
11971197
if (vp->Hasnull_flag_value()) {
11981198
c = TCell();
@@ -1256,7 +1256,7 @@ bool CellFromProtoVal(const NScheme::TTypeInfo& type, i32 typmod, const Ydb::Val
12561256
break;
12571257
}
12581258
case NScheme::NTypeIds::JsonDocument : {
1259-
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(val.Gettext_value());
1259+
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(val.Gettext_value(), allowInfDouble);
12601260
if (std::holds_alternative<TString>(binaryJson)) {
12611261
err = "Invalid JSON for JsonDocument provided: " + std::get<TString>(binaryJson);
12621262
return false;

ydb/core/ydb_convert/ydb_convert.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ void ConvertDirectoryEntry(const NKikimrSchemeOp::TDirEntry& from, Ydb::Scheme::
5454
void ConvertDirectoryEntry(const NKikimrSchemeOp::TPathDescription& from, Ydb::Scheme::Entry* to, bool processAcl);
5555

5656
bool CellFromProtoVal(const NScheme::TTypeInfo& type, i32 typmod, const Ydb::Value* vp, bool allowCastFromString,
57-
TCell& c, TString& err, TMemoryPool& valueDataPool);
57+
TCell& c, TString& err, TMemoryPool& valueDataPool, bool allowInfDouble = false);
5858

5959
void ProtoValueFromCell(NYdb::TValueBuilder& vb, const NScheme::TTypeInfo& typeInfo, const TCell& cell);
6060

0 commit comments

Comments
 (0)