Skip to content

Commit 789f39c

Browse files
authored
table I/O for big tz dates (#5846)
1 parent 1fb6b33 commit 789f39c

File tree

27 files changed

+603
-15
lines changed

27 files changed

+603
-15
lines changed

ydb/library/yql/minikql/mkql_type_ops.cpp

+12-12
Original file line numberDiff line numberDiff line change
@@ -2606,23 +2606,23 @@ bool DeserializeTzTimestamp(TStringBuf buf, ui64& timestamp, ui16& tzId) {
26062606
}
26072607

26082608
void SerializeTzDate32(i32 date, ui16 tzId, IOutputStream& out) {
2609-
date = SwapBytes(date);
2609+
auto value = 0x80 ^ SwapBytes((ui32)date);
26102610
tzId = SwapBytes(tzId);
2611-
out.Write(&date, sizeof(date));
2611+
out.Write(&value, sizeof(value));
26122612
out.Write(&tzId, sizeof(tzId));
26132613
}
26142614

26152615
void SerializeTzDatetime64(i64 datetime, ui16 tzId, IOutputStream& out) {
2616-
datetime = SwapBytes(datetime);
2616+
auto value = 0x80 ^ SwapBytes((ui64)datetime);
26172617
tzId = SwapBytes(tzId);
2618-
out.Write(&datetime, sizeof(datetime));
2618+
out.Write(&value, sizeof(value));
26192619
out.Write(&tzId, sizeof(tzId));
26202620
}
26212621

26222622
void SerializeTzTimestamp64(i64 timestamp, ui16 tzId, IOutputStream& out) {
2623-
timestamp = SwapBytes(timestamp);
2623+
auto value = 0x80 ^ SwapBytes((ui64)timestamp);
26242624
tzId = SwapBytes(tzId);
2625-
out.Write(&timestamp, sizeof(timestamp));
2625+
out.Write(&value, sizeof(value));
26262626
out.Write(&tzId, sizeof(tzId));
26272627
}
26282628

@@ -2631,8 +2631,8 @@ bool DeserializeTzDate32(TStringBuf buf, i32& date, ui16& tzId) {
26312631
return false;
26322632
}
26332633

2634-
date = ReadUnaligned<i32>(buf.data());
2635-
date = SwapBytes(date);
2634+
auto value = ReadUnaligned<ui32>(buf.data());
2635+
date = (i32)(SwapBytes(value ^ 0x80));
26362636
if (date < NUdf::MIN_DATE32 || date > NUdf::MAX_DATE32) {
26372637
return false;
26382638
}
@@ -2651,8 +2651,8 @@ bool DeserializeTzDatetime64(TStringBuf buf, i64& datetime, ui16& tzId) {
26512651
return false;
26522652
}
26532653

2654-
datetime = ReadUnaligned<i64>(buf.data());
2655-
datetime = SwapBytes(datetime);
2654+
auto value = ReadUnaligned<ui64>(buf.data());
2655+
datetime = (i64)(SwapBytes(0x80 ^ value));
26562656
if (datetime < NUdf::MIN_DATETIME64 || datetime > NUdf::MAX_DATETIME64) {
26572657
return false;
26582658
}
@@ -2671,8 +2671,8 @@ bool DeserializeTzTimestamp64(TStringBuf buf, i64& timestamp, ui16& tzId) {
26712671
return false;
26722672
}
26732673

2674-
timestamp = ReadUnaligned<i64>(buf.data());
2675-
timestamp = SwapBytes(timestamp);
2674+
auto value = ReadUnaligned<ui64>(buf.data());
2675+
timestamp = (i64)(SwapBytes(0x80 ^ value));
26762676
if (timestamp < NUdf::MIN_TIMESTAMP64 || timestamp > NUdf::MAX_TIMESTAMP64) {
26772677
return false;
26782678
}

ydb/library/yql/providers/common/codec/yql_codec.cpp

+129
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,21 @@ NYT::TNode DataValueToNode(const NKikimr::NUdf::TUnboxedValuePod& value, NKikimr
386386
NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value);
387387
return NYT::TNode(ToString(TStringBuf(value.AsStringRef())));
388388
}
389+
case NUdf::TDataType<NUdf::TTzDate32>::Id: {
390+
TStringStream out;
391+
out << value.Get<i32>() << "," << NKikimr::NMiniKQL::GetTimezoneIANAName(value.GetTimezoneId());
392+
return NYT::TNode(out.Str());
393+
}
394+
case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
395+
TStringStream out;
396+
out << value.Get<i64>() << "," << NKikimr::NMiniKQL::GetTimezoneIANAName(value.GetTimezoneId());
397+
return NYT::TNode(out.Str());
398+
}
399+
case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
400+
TStringStream out;
401+
out << value.Get<i64>() << "," << NKikimr::NMiniKQL::GetTimezoneIANAName(value.GetTimezoneId());
402+
return NYT::TNode(out.Str());
403+
}
389404
}
390405
YQL_ENSURE(false, "Unsupported type: " << static_cast<int>(dataType->GetSchemeType()));
391406
}
@@ -1084,6 +1099,57 @@ NUdf::TUnboxedValue ReadYsonValue(TType* type, ui64 nativeYtTypeFlags,
10841099
return ValueFromString(EDataSlot::JsonDocument, json.AsStringRef());
10851100
}
10861101

1102+
case NUdf::TDataType<NUdf::TTzDate32>::Id: {
1103+
auto nextString = ReadNextString(cmd, buf);
1104+
NUdf::TUnboxedValuePod data;
1105+
if (isTableFormat) {
1106+
i32 value;
1107+
ui16 tzId = 0;
1108+
YQL_ENSURE(DeserializeTzDate32(nextString, value, tzId));
1109+
data = NUdf::TUnboxedValuePod(value);
1110+
data.SetTimezoneId(tzId);
1111+
} else {
1112+
data = ValueFromString(NUdf::EDataSlot::TzDate32, nextString);
1113+
YQL_ENSURE(data, "incorrect tz date format for value " << nextString);
1114+
}
1115+
1116+
return data;
1117+
}
1118+
1119+
case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
1120+
auto nextString = ReadNextString(cmd, buf);
1121+
NUdf::TUnboxedValuePod data;
1122+
if (isTableFormat) {
1123+
i64 value;
1124+
ui16 tzId = 0;
1125+
YQL_ENSURE(DeserializeTzDatetime64(nextString, value, tzId));
1126+
data = NUdf::TUnboxedValuePod(value);
1127+
data.SetTimezoneId(tzId);
1128+
} else {
1129+
data = ValueFromString(NUdf::EDataSlot::TzDatetime64, nextString);
1130+
YQL_ENSURE(data, "incorrect tz datetime format for value " << nextString);
1131+
}
1132+
1133+
return data;
1134+
}
1135+
1136+
case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
1137+
auto nextString = ReadNextString(cmd, buf);
1138+
NUdf::TUnboxedValuePod data;
1139+
if (isTableFormat) {
1140+
i64 value;
1141+
ui16 tzId = 0;
1142+
YQL_ENSURE(DeserializeTzTimestamp64(nextString, value, tzId));
1143+
data = NUdf::TUnboxedValuePod(value);
1144+
data.SetTimezoneId(tzId);
1145+
} else {
1146+
data = ValueFromString(NUdf::EDataSlot::TzTimestamp64, nextString);
1147+
YQL_ENSURE(data, "incorrect tz timestamp format for value " << nextString);
1148+
}
1149+
1150+
return data;
1151+
}
1152+
10871153
default:
10881154
YQL_ENSURE(false, "Unsupported data type: " << schemeType);
10891155
}
@@ -2198,6 +2264,39 @@ void WriteYsonValueInTableFormat(TOutputBuf& buf, TType* type, ui64 nativeYtType
21982264
break;
21992265
}
22002266

2267+
case NUdf::TDataType<NUdf::TTzDate32>::Id: {
2268+
ui16 tzId = SwapBytes(value.GetTimezoneId());
2269+
ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>());
2270+
ui32 size = sizeof(data) + sizeof(tzId);
2271+
buf.Write(StringMarker);
2272+
buf.WriteVarI32(size);
2273+
buf.WriteMany((const char*)&data, sizeof(data));
2274+
buf.WriteMany((const char*)&tzId, sizeof(tzId));
2275+
break;
2276+
}
2277+
2278+
case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
2279+
ui16 tzId = SwapBytes(value.GetTimezoneId());
2280+
ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
2281+
ui32 size = sizeof(data) + sizeof(tzId);
2282+
buf.Write(StringMarker);
2283+
buf.WriteVarI32(size);
2284+
buf.WriteMany((const char*)&data, sizeof(data));
2285+
buf.WriteMany((const char*)&tzId, sizeof(tzId));
2286+
break;
2287+
}
2288+
2289+
case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
2290+
ui16 tzId = SwapBytes(value.GetTimezoneId());
2291+
ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
2292+
ui32 size = sizeof(data) + sizeof(tzId);
2293+
buf.Write(StringMarker);
2294+
buf.WriteVarI32(size);
2295+
buf.WriteMany((const char*)&data, sizeof(data));
2296+
buf.WriteMany((const char*)&tzId, sizeof(tzId));
2297+
break;
2298+
}
2299+
22012300
case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
22022301
buf.Write(StringMarker);
22032302
NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value);
@@ -2480,6 +2579,36 @@ void WriteSkiffData(NKikimr::NMiniKQL::TType* type, ui64 nativeYtTypeFlags, cons
24802579
break;
24812580
}
24822581

2582+
case NUdf::TDataType<NUdf::TTzDate32>::Id: {
2583+
ui16 tzId = SwapBytes(value.GetTimezoneId());
2584+
ui32 data = 0x80 ^ SwapBytes((ui32)value.Get<i32>());
2585+
ui32 size = sizeof(data) + sizeof(tzId);
2586+
buf.WriteMany((const char*)&size, sizeof(size));
2587+
buf.WriteMany((const char*)&data, sizeof(data));
2588+
buf.WriteMany((const char*)&tzId, sizeof(tzId));
2589+
break;
2590+
}
2591+
2592+
case NUdf::TDataType<NUdf::TTzDatetime64>::Id: {
2593+
ui16 tzId = SwapBytes(value.GetTimezoneId());
2594+
ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
2595+
ui32 size = sizeof(data) + sizeof(tzId);
2596+
buf.WriteMany((const char*)&size, sizeof(size));
2597+
buf.WriteMany((const char*)&data, sizeof(data));
2598+
buf.WriteMany((const char*)&tzId, sizeof(tzId));
2599+
break;
2600+
}
2601+
2602+
case NUdf::TDataType<NUdf::TTzTimestamp64>::Id: {
2603+
ui16 tzId = SwapBytes(value.GetTimezoneId());
2604+
ui64 data = 0x80 ^ SwapBytes((ui64)value.Get<i64>());
2605+
ui32 size = sizeof(data) + sizeof(tzId);
2606+
buf.WriteMany((const char*)&size, sizeof(size));
2607+
buf.WriteMany((const char*)&data, sizeof(data));
2608+
buf.WriteMany((const char*)&tzId, sizeof(tzId));
2609+
break;
2610+
}
2611+
24832612
case NUdf::TDataType<NUdf::TJsonDocument>::Id: {
24842613
NUdf::TUnboxedValue json = ValueToString(EDataSlot::JsonDocument, value);
24852614
auto str = json.AsStringRef();

ydb/library/yql/providers/yt/codec/codegen/ut/yt_codec_cg_ut.cpp

+63
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,42 @@ Y_UNIT_TEST_SUITE(TYtCodegenCodec) {
862862
UNIT_ASSERT_VALUES_EQUAL(val.GetTimezoneId(), 1);
863863
}
864864

865+
Y_UNIT_TEST(TestReadTzDate32) {
866+
// full size = 4 + 2 = 6
867+
TStringBuf buf = "\6\0\0\0\x7F\xFF\xFE\xFE\0\1"sv;
868+
TReadSetup setup("ReadTzDate32", buf);
869+
typedef void(*TFunc)(TInputBuf&, void*);
870+
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
871+
NUdf::TUnboxedValue val;
872+
funcPtr(setup.Buf_, &val);
873+
UNIT_ASSERT_VALUES_EQUAL(val.Get<i32>(), -258);
874+
UNIT_ASSERT_VALUES_EQUAL(val.GetTimezoneId(), 1);
875+
}
876+
877+
Y_UNIT_TEST(TestReadTzDatetime64) {
878+
// full size = 8 + 2 = 10
879+
TStringBuf buf = "\n\0\0\0\x7F\xFF\xFF\xFF\xFF\xFF\xFE\xFD\0\1"sv;
880+
TReadSetup setup("ReadTzDatetime64", buf);
881+
typedef void(*TFunc)(TInputBuf&, void*);
882+
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
883+
NUdf::TUnboxedValue val;
884+
funcPtr(setup.Buf_, &val);
885+
UNIT_ASSERT_VALUES_EQUAL(val.Get<i64>(), -259);
886+
UNIT_ASSERT_VALUES_EQUAL(val.GetTimezoneId(), 1);
887+
}
888+
889+
Y_UNIT_TEST(TestReadTzTimestamp64) {
890+
// full size = 8 + 2 = 10
891+
TStringBuf buf = "\n\0\0\0\x7F\xFF\xFF\xFF\xFF\xFF\xFE\xFC\0\1"sv;
892+
TReadSetup setup("ReadTzTimestamp64", buf);
893+
typedef void(*TFunc)(TInputBuf&, void*);
894+
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
895+
NUdf::TUnboxedValue val;
896+
funcPtr(setup.Buf_, &val);
897+
UNIT_ASSERT_VALUES_EQUAL(val.Get<i64>(), -260);
898+
UNIT_ASSERT_VALUES_EQUAL(val.GetTimezoneId(), 1);
899+
}
900+
865901
Y_UNIT_TEST(TestWriteTzDate) {
866902
TWriteSetup setup("WriteTzDate");
867903
typedef void(*TFunc)(TOutputBuf&, ui16, ui16);
@@ -888,6 +924,33 @@ Y_UNIT_TEST_SUITE(TYtCodegenCodec) {
888924
setup.Buf_.Finish();
889925
UNIT_ASSERT_STRINGS_EQUAL(setup.TestWriter_.Str().Quote(), TString("\x0a\0\0\0\0\0\0\0\0\0\1\4\0\1"sv).Quote());
890926
}
927+
928+
Y_UNIT_TEST(TestWriteTzDate32) {
929+
TWriteSetup setup("WriteTzDate32");
930+
typedef void(*TFunc)(TOutputBuf&, i32, ui16);
931+
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
932+
funcPtr(setup.Buf_, -258, 1);
933+
setup.Buf_.Finish();
934+
UNIT_ASSERT_STRINGS_EQUAL(setup.TestWriter_.Str().Quote(), TString("\6\0\0\0\x7F\xFF\xFE\xFE\0\1"sv).Quote());
935+
}
936+
937+
Y_UNIT_TEST(TestWriteTzDatetime64) {
938+
TWriteSetup setup("WriteTzDatetime64");
939+
typedef void(*TFunc)(TOutputBuf&, i64, ui16);
940+
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
941+
funcPtr(setup.Buf_, -259, 1);
942+
setup.Buf_.Finish();
943+
UNIT_ASSERT_STRINGS_EQUAL(setup.TestWriter_.Str().Quote(), TString("\n\0\0\0\x7F\xFF\xFF\xFF\xFF\xFF\xFE\xFD\0\1"sv).Quote());
944+
}
945+
946+
Y_UNIT_TEST(TestWriteTzTimestamp64) {
947+
TWriteSetup setup("WriteTzTimestamp64");
948+
typedef void(*TFunc)(TOutputBuf&, i64, ui16);
949+
auto funcPtr = (TFunc)setup.Codegen_->GetPointerToFunction(setup.Func_);
950+
funcPtr(setup.Buf_, -260, 1);
951+
setup.Buf_.Finish();
952+
UNIT_ASSERT_STRINGS_EQUAL(setup.TestWriter_.Str().Quote(), TString("\n\0\0\0\x7F\xFF\xFF\xFF\xFF\xFF\xFE\xFC\0\1"sv).Quote());
953+
}
891954
#endif
892955
}
893956

ydb/library/yql/providers/yt/codec/codegen/ya.make

+6
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,15 @@ IF (NOT MKQL_DISABLE_CODEGEN)
5959
ReadTzDate
6060
ReadTzDatetime
6161
ReadTzTimestamp
62+
ReadTzDate32
63+
ReadTzDatetime64
64+
ReadTzTimestamp64
6265
WriteTzDate
6366
WriteTzDatetime
6467
WriteTzTimestamp
68+
WriteTzDate32
69+
WriteTzDatetime64
70+
WriteTzTimestamp64
6571
GetWrittenBytes
6672
FillZero
6773
)

ydb/library/yql/providers/yt/codec/codegen/yt_codec_bc.cpp

+69
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,45 @@ extern "C" void ReadTzTimestamp(void* vbuf, void* vpod) {
267267
(new (vpod) NUdf::TUnboxedValuePod(data))->SetTimezoneId(tzId);
268268
}
269269

270+
extern "C" void ReadTzDate32(void* vbuf, void* vpod) {
271+
NCommon::TInputBuf& buf = *(NCommon::TInputBuf*)vbuf;
272+
ui32 size;
273+
buf.ReadMany((char*)&size, sizeof(size));
274+
ui32 data;
275+
buf.ReadMany((char*)&data, sizeof(data));
276+
ui16 tzId;
277+
buf.ReadMany((char*)&tzId, sizeof(tzId));
278+
i32 value = SwapBytes(0x80 ^ data);
279+
tzId = SwapBytes(tzId);
280+
(new (vpod) NUdf::TUnboxedValuePod(value))->SetTimezoneId(tzId);
281+
}
282+
283+
extern "C" void ReadTzDatetime64(void* vbuf, void* vpod) {
284+
NCommon::TInputBuf& buf = *(NCommon::TInputBuf*)vbuf;
285+
ui32 size;
286+
buf.ReadMany((char*)&size, sizeof(size));
287+
ui64 data;
288+
buf.ReadMany((char*)&data, sizeof(data));
289+
ui16 tzId;
290+
buf.ReadMany((char*)&tzId, sizeof(tzId));
291+
i64 value = SwapBytes(0x80 ^ data);
292+
tzId = SwapBytes(tzId);
293+
(new (vpod) NUdf::TUnboxedValuePod(value))->SetTimezoneId(tzId);
294+
}
295+
296+
extern "C" void ReadTzTimestamp64(void* vbuf, void* vpod) {
297+
NCommon::TInputBuf& buf = *(NCommon::TInputBuf*)vbuf;
298+
ui32 size;
299+
buf.ReadMany((char*)&size, sizeof(size));
300+
ui64 data;
301+
buf.ReadMany((char*)&data, sizeof(data));
302+
ui16 tzId;
303+
buf.ReadMany((char*)&tzId, sizeof(tzId));
304+
i64 value = SwapBytes(0x80 ^ data);
305+
tzId = SwapBytes(tzId);
306+
(new (vpod) NUdf::TUnboxedValuePod(value))->SetTimezoneId(tzId);
307+
}
308+
270309
extern "C" void WriteTzDate(void* vbuf, ui16 value, ui16 tzId) {
271310
value = SwapBytes(value);
272311
tzId = SwapBytes(tzId);
@@ -297,6 +336,36 @@ extern "C" void WriteTzTimestamp(void* vbuf, ui64 value, ui16 tzId) {
297336
buf.WriteMany((const char*)&tzId, sizeof(tzId));
298337
}
299338

339+
extern "C" void WriteTzDate32(void* vbuf, i32 value, ui16 tzId) {
340+
ui32 data = 0x80 ^ SwapBytes((ui32)value);
341+
tzId = SwapBytes(tzId);
342+
NCommon::TOutputBuf& buf = *(NCommon::TOutputBuf*)vbuf;
343+
const ui32 size = sizeof(data) + sizeof(tzId);
344+
buf.WriteMany((const char*)&size, sizeof(size));
345+
buf.WriteMany((const char*)&data, sizeof(data));
346+
buf.WriteMany((const char*)&tzId, sizeof(tzId));
347+
}
348+
349+
extern "C" void WriteTzDatetime64(void* vbuf, i64 value, ui16 tzId) {
350+
ui64 data = 0x80 ^ SwapBytes((ui64)value);
351+
tzId = SwapBytes(tzId);
352+
NCommon::TOutputBuf& buf = *(NCommon::TOutputBuf*)vbuf;
353+
const ui32 size = sizeof(data) + sizeof(tzId);
354+
buf.WriteMany((const char*)&size, sizeof(size));
355+
buf.WriteMany((const char*)&data, sizeof(data));
356+
buf.WriteMany((const char*)&tzId, sizeof(tzId));
357+
}
358+
359+
extern "C" void WriteTzTimestamp64(void* vbuf, i64 value, ui16 tzId) {
360+
ui64 data = 0x80 ^ SwapBytes((ui64)value);
361+
tzId = SwapBytes(tzId);
362+
NCommon::TOutputBuf& buf = *(NCommon::TOutputBuf*)vbuf;
363+
const ui32 size = sizeof(data) + sizeof(tzId);
364+
buf.WriteMany((const char*)&size, sizeof(size));
365+
buf.WriteMany((const char*)&data, sizeof(data));
366+
buf.WriteMany((const char*)&tzId, sizeof(tzId));
367+
}
368+
300369
extern "C" ui64 GetWrittenBytes(void* vbuf) {
301370
NCommon::TOutputBuf& buf = *(NCommon::TOutputBuf*)vbuf;
302371
return buf.GetWrittenBytes();

0 commit comments

Comments
 (0)