@@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
789
789
static THolder<TDataStreamsClient> MakeClient (const NYdb::TDriver& driver, const TString& database) {
790
790
return MakeHolder<TDataStreamsClient>(driver, NYdb::TCommonClientSettings ().Database (database));
791
791
}
792
- };
792
+ };
793
793
794
794
class TTestTopicEnv : public TTestEnv <TTestTopicEnv, NYdb::NTopic::TTopicClient> {
795
795
public:
@@ -798,7 +798,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
798
798
static THolder<NYdb::NTopic::TTopicClient> MakeClient (const NYdb::TDriver& driver, const TString& database) {
799
799
return MakeHolder<NYdb::NTopic::TTopicClient>(driver, NYdb::NTopic::TTopicClientSettings ().Database (database));
800
800
}
801
- };
801
+ };
802
802
803
803
TShardedTableOptions SimpleTable () {
804
804
return TShardedTableOptions ()
@@ -1344,7 +1344,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1344
1344
(3, 30);
1345
1345
)" , R"(
1346
1346
DELETE FROM `/Root/Table` WHERE key = 1;
1347
- )" }, {
1347
+ )" }, {
1348
1348
R"( {"update":{},"key":[1]})" ,
1349
1349
R"( {"update":{},"key":[2]})" ,
1350
1350
R"( {"update":{},"key":[3]})" ,
@@ -1360,7 +1360,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1360
1360
(3, 30);
1361
1361
)" , R"(
1362
1362
DELETE FROM `/Root/Table` WHERE key = 1;
1363
- )" }, {
1363
+ )" }, {
1364
1364
{DebeziumBody (" u" , nullptr , nullptr ), {{" __key" , R"( {"payload":{"key":1}})" }}},
1365
1365
{DebeziumBody (" u" , nullptr , nullptr ), {{" __key" , R"( {"payload":{"key":2}})" }}},
1366
1366
{DebeziumBody (" u" , nullptr , nullptr ), {{" __key" , R"( {"payload":{"key":3}})" }}},
@@ -1376,7 +1376,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1376
1376
(3, 30);
1377
1377
)" , R"(
1378
1378
DELETE FROM `/Root/Table` WHERE key = 1;
1379
- )" }, {
1379
+ )" }, {
1380
1380
R"( {"update":{"value":10},"key":[1]})" ,
1381
1381
R"( {"update":{"value":20},"key":[2]})" ,
1382
1382
R"( {"update":{"value":30},"key":[3]})" ,
@@ -1397,7 +1397,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1397
1397
(3, 300);
1398
1398
)" , R"(
1399
1399
DELETE FROM `/Root/Table` WHERE key = 1;
1400
- )" }, {
1400
+ )" }, {
1401
1401
R"( {"update":{},"newImage":{"value":10},"key":[1]})" ,
1402
1402
R"( {"update":{},"newImage":{"value":20},"key":[2]})" ,
1403
1403
R"( {"update":{},"newImage":{"value":30},"key":[3]})" ,
@@ -1421,7 +1421,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1421
1421
(3, 300);
1422
1422
)" , R"(
1423
1423
DELETE FROM `/Root/Table` WHERE key = 1;
1424
- )" }, {
1424
+ )" }, {
1425
1425
{DebeziumBody (" c" , nullptr , R"( {"key":1,"value":10})" ), {{" __key" , R"( {"payload":{"key":1}})" }}},
1426
1426
{DebeziumBody (" c" , nullptr , R"( {"key":2,"value":20})" ), {{" __key" , R"( {"payload":{"key":2}})" }}},
1427
1427
{DebeziumBody (" c" , nullptr , R"( {"key":3,"value":30})" ), {{" __key" , R"( {"payload":{"key":3}})" }}},
@@ -1445,7 +1445,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1445
1445
(3, 300);
1446
1446
)" , R"(
1447
1447
DELETE FROM `/Root/Table` WHERE key = 1;
1448
- )" }, {
1448
+ )" }, {
1449
1449
{DebeziumBody (" u" , nullptr , nullptr ), {{" __key" , R"( {"payload":{"key":1}})" }}},
1450
1450
{DebeziumBody (" u" , nullptr , nullptr ), {{" __key" , R"( {"payload":{"key":2}})" }}},
1451
1451
{DebeziumBody (" u" , nullptr , nullptr ), {{" __key" , R"( {"payload":{"key":3}})" }}},
@@ -1456,7 +1456,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1456
1456
});
1457
1457
}
1458
1458
1459
- Y_UNIT_TEST (NewImageLogDebezium) {
1459
+ Y_UNIT_TEST (NewImageLogDebezium) {
1460
1460
TopicRunner::Read (SimpleTable (), NewImage (NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
1461
1461
UPSERT INTO `/Root/Table` (key, value) VALUES
1462
1462
(1, 10),
@@ -1469,7 +1469,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1469
1469
(3, 300);
1470
1470
)" , R"(
1471
1471
DELETE FROM `/Root/Table` WHERE key = 1;
1472
- )" }, {
1472
+ )" }, {
1473
1473
{DebeziumBody (" u" , nullptr , R"( {"key":1,"value":10})" ), {{" __key" , R"( {"payload":{"key":1}})" }}},
1474
1474
{DebeziumBody (" u" , nullptr , R"( {"key":2,"value":20})" ), {{" __key" , R"( {"payload":{"key":2}})" }}},
1475
1475
{DebeziumBody (" u" , nullptr , R"( {"key":3,"value":30})" ), {{" __key" , R"( {"payload":{"key":3}})" }}},
@@ -1486,7 +1486,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1486
1486
(1, 10),
1487
1487
(2, 20),
1488
1488
(3, 30);
1489
- )" }, {
1489
+ )" }, {
1490
1490
R"( {"update":{},"key":[1],"ts":"***"})" ,
1491
1491
R"( {"update":{},"key":[2],"ts":"***"})" ,
1492
1492
R"( {"update":{},"key":[3],"ts":"***"})" ,
@@ -1512,7 +1512,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1512
1512
UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData) VALUES (
1513
1513
1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}')
1514
1514
);
1515
- )" }, {
1515
+ )" }, {
1516
1516
WriteJson (NJson::TJsonMap ({
1517
1517
{" awsRegion" , " " },
1518
1518
{" dynamodb" , NJson::TJsonMap ({
@@ -1541,7 +1541,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1541
1541
);
1542
1542
)" , R"(
1543
1543
DELETE FROM `/Root/Table` WHERE __Hash = 1;
1544
- )" }, {
1544
+ )" }, {
1545
1545
WriteJson (NJson::TJsonMap ({
1546
1546
{" awsRegion" , " " },
1547
1547
{" dynamodb" , NJson::TJsonMap ({
@@ -1639,7 +1639,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1639
1639
(1, 0.0%s/0.0%s),
1640
1640
(2, 1.0%s/0.0%s),
1641
1641
(3, -1.0%s/0.0%s);
1642
- )" , s, s, s, s, s, s)}, {
1642
+ )" , s, s, s, s, s, s)}, {
1643
1643
R"( {"update":{"value":"nan"},"key":[1]})" ,
1644
1644
R"( {"update":{"value":"inf"},"key":[2]})" ,
1645
1645
R"( {"update":{"value":"-inf"},"key":[3]})" ,
@@ -1674,7 +1674,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1674
1674
TopicRunner::Read (table, KeysOnly (NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {Sprintf (R"(
1675
1675
UPSERT INTO `/Root/Table` (key, value) VALUES
1676
1676
("%s", 1);
1677
- )" , key.c_str ())}, {
1677
+ )" , key.c_str ())}, {
1678
1678
{DebeziumBody (" u" , nullptr , nullptr ), {{" __key" , Sprintf (R"( {"payload":{"key":"%s"}})" , key.c_str ())}}},
1679
1679
});
1680
1680
}
@@ -2043,7 +2043,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
2043
2043
ExecSQL (env.GetServer (), env.GetEdgeActor (), R"(
2044
2044
UPSERT INTO `/Root/TableAux` (key, value)
2045
2045
VALUES (1, 10);
2046
- )" );
2046
+ )" );
2047
2047
2048
2048
SetSplitMergePartCountLimit (&runtime, -1 );
2049
2049
const auto tabletIds = GetTableShards (env.GetServer (), env.GetEdgeActor (), " /Root/Table" );
@@ -2292,7 +2292,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
2292
2292
auto tabletIds = GetTableShards (env.GetServer (), env.GetEdgeActor (), " /Root/Table" );
2293
2293
UNIT_ASSERT_VALUES_EQUAL (tabletIds.size (), 1 );
2294
2294
2295
- WaitTxNotification (env.GetServer (), env.GetEdgeActor (),
2295
+ WaitTxNotification (env.GetServer (), env.GetEdgeActor (),
2296
2296
AsyncSplitTable (env.GetServer (), env.GetEdgeActor (), " /Root/Table" , tabletIds.at (0 ), 4 ));
2297
2297
2298
2298
// execute on old partitions
@@ -2376,7 +2376,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
2376
2376
2377
2377
case TSchemeBoardEvents::EvUpdate:
2378
2378
if (auto * msg = ev->Get <TSchemeBoardEvents::TEvUpdate>()) {
2379
- const auto desc = msg->GetRecord ().GetDescribeSchemeResult ();
2379
+ NKikimrScheme::TEvDescribeSchemeResult desc;
2380
+ Y_ABORT_UNLESS (ParseFromStringNoSizeLimit (desc, *msg->GetRecord ().GetDescribeSchemeResultSerialized ().begin ()));
2380
2381
if (desc.GetPath () == " /Root/Table/Stream" && desc.GetPathDescription ().GetSelf ().GetCreateFinished ()) {
2381
2382
delayed.emplace_back (ev.Release ());
2382
2383
return TTestActorRuntime::EEventAction::DROP;
@@ -2446,7 +2447,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
2446
2447
ExecSQL (env.GetServer (), env.GetEdgeActor (), R"(
2447
2448
UPSERT INTO `/Root/Table` (key, value)
2448
2449
VALUES (1, 10);
2449
- )" );
2450
+ )" );
2450
2451
2451
2452
SetSplitMergePartCountLimit (&runtime, -1 );
2452
2453
const auto tabletIds = GetTableShards (env.GetServer (), env.GetEdgeActor (), " /Root/Table" );
@@ -3266,7 +3267,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
3266
3267
auto tabletIds = GetTableShards (env.GetServer (), env.GetEdgeActor (), " /Root/Table" );
3267
3268
UNIT_ASSERT_VALUES_EQUAL (tabletIds.size (), 1 );
3268
3269
3269
- WaitTxNotification (env.GetServer (), env.GetEdgeActor (),
3270
+ WaitTxNotification (env.GetServer (), env.GetEdgeActor (),
3270
3271
AsyncSplitTable (env.GetServer (), env.GetEdgeActor (), " /Root/Table" , tabletIds.at (0 ), 4 ));
3271
3272
3272
3273
// merge
@@ -3298,7 +3299,7 @@ template <>
3298
3299
void Out<std::pair<TString, TString>>(IOutputStream& output, const std::pair<TString, TString>& x) {
3299
3300
output << x.first << " :" << x.second ;
3300
3301
}
3301
-
3302
+
3302
3303
void AppendToString (TString& dst, const std::pair<TString, TString>& x) {
3303
3304
TStringOutput output (dst);
3304
3305
output << x;
0 commit comments