Skip to content

Commit 75470f7

Browse files
authored
ut_read_iterator + EvWrite (#742)
ut_read_iterator + EvWrite
1 parent f59afad commit 75470f7

File tree

4 files changed

+84
-38
lines changed

4 files changed

+84
-38
lines changed

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

+67-25
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
#include <ydb/core/tx/tx_proxy/proxy.h>
1111
#include <ydb/core/tx/tx_proxy/read_table.h>
1212

13+
#include <ydb/core/tx/data_events/events.h>
14+
#include <ydb/core/tx/data_events/payload_helper.h>
15+
1316
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
1417

1518
#include <algorithm>
@@ -25,46 +28,52 @@ namespace {
2528

2629
using TCellVec = std::vector<TCell>;
2730

28-
void CreateTable(Tests::TServer::TPtr server,
31+
TVector<TShardedTableOptions::TColumn> GetColumns() {
32+
TVector<TShardedTableOptions::TColumn> columns = {
33+
{"key1", "Uint32", true, false},
34+
{"key2", "Uint32", true, false},
35+
{"key3", "Uint32", true, false},
36+
{"value", "Uint32", false, false}};
37+
38+
return columns;
39+
}
40+
41+
TVector<TShardedTableOptions::TColumn> GetMoviesColumns() {
42+
TVector<TShardedTableOptions::TColumn> columns = {
43+
{"id", "Uint32", true, false},
44+
{"title", "String", false, false},
45+
{"rating", "Uint32", false, false}};
46+
47+
return columns;
48+
}
49+
50+
std::tuple<TVector<ui64>, ui64> CreateTable(Tests::TServer::TPtr server,
2951
TActorId sender,
3052
const TString &root,
3153
const TString &name,
3254
bool withFollower = false,
3355
ui64 shardCount = 1)
3456
{
35-
TVector<TShardedTableOptions::TColumn> columns = {
36-
{"key1", "Uint32", true, false},
37-
{"key2", "Uint32", true, false},
38-
{"key3", "Uint32", true, false},
39-
{"value", "Uint32", false, false}
40-
};
41-
4257
auto opts = TShardedTableOptions()
4358
.Shards(shardCount)
44-
.Columns(columns);
59+
.Columns(GetColumns());
4560

4661
if (withFollower)
4762
opts.Followers(1);
4863

49-
CreateShardedTable(server, sender, root, name, opts);
64+
return CreateShardedTable(server, sender, root, name, opts);
5065
}
5166

52-
void CreateMoviesTable(Tests::TServer::TPtr server,
67+
std::tuple<TVector<ui64>, ui64> CreateMoviesTable(Tests::TServer::TPtr server,
5368
TActorId sender,
5469
const TString &root,
5570
const TString &name)
5671
{
57-
TVector<TShardedTableOptions::TColumn> columns = {
58-
{"id", "Uint32", true, false},
59-
{"title", "String", false, false},
60-
{"rating", "Uint32", false, false}
61-
};
62-
6372
auto opts = TShardedTableOptions()
6473
.Shards(1)
65-
.Columns(columns);
74+
.Columns(GetMoviesColumns());
6675

67-
CreateShardedTable(server, sender, root, name, opts);
76+
return CreateShardedTable(server, sender, root, name, opts);
6877
}
6978

7079
struct TRowWriter : public NArrow::IRowWriter {
@@ -308,11 +317,14 @@ void AddRangeQuery(
308317
struct TTableInfo {
309318
TString Name;
310319

320+
ui64 TableId;
311321
ui64 TabletId;
312322
ui64 OwnerId;
313323
NKikimrTxDataShard::TEvGetInfoResponse::TUserTable UserTable;
314324

315325
TActorId ClientId;
326+
327+
TVector<TShardedTableOptions::TColumn> Columns;
316328
};
317329

318330
struct TTestHelper {
@@ -345,7 +357,7 @@ struct TTestHelper {
345357
{
346358
auto& table1 = Tables["table-1"];
347359
table1.Name = "table-1";
348-
CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount);
360+
auto [shards, tableId] = CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount);
349361
ExecSQL(Server, Sender, R"(
350362
UPSERT INTO `/Root/table-1`
351363
(key1, key2, key3, value)
@@ -360,20 +372,22 @@ struct TTestHelper {
360372
(11, 11, 11, 1111);
361373
)");
362374

363-
auto shards = GetTableShards(Server, Sender, "/Root/table-1");
375+
table1.TableId = tableId;
364376
table1.TabletId = shards.at(0);
365377

366378
auto [tables, ownerId] = GetTables(Server, table1.TabletId);
367379
table1.OwnerId = ownerId;
368380
table1.UserTable = tables["table-1"];
369381

370382
table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetTestPipeConfig());
383+
384+
table1.Columns = GetColumns();
371385
}
372386

373387
{
374388
auto& table2 = Tables["movies"];
375389
table2.Name = "movies";
376-
CreateMoviesTable(Server, Sender, "/Root", "movies");
390+
auto [shards, tableId] = CreateMoviesTable(Server, Sender, "/Root", "movies");
377391
ExecSQL(Server, Sender, R"(
378392
UPSERT INTO `/Root/movies`
379393
(id, title, rating)
@@ -383,29 +397,33 @@ struct TTestHelper {
383397
(3, "Hard die", 8);
384398
)");
385399

386-
auto shards = GetTableShards(Server, Sender, "/Root/movies");
400+
table2.TableId = tableId;
387401
table2.TabletId = shards.at(0);
388402

389403
auto [tables, ownerId] = GetTables(Server, table2.TabletId);
390404
table2.OwnerId = ownerId;
391405
table2.UserTable = tables["movies"];
392406

393407
table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetTestPipeConfig());
408+
409+
table2.Columns = GetMoviesColumns();
394410
}
395411

396412
{
397413
auto& table3 = Tables["table-1-many"];
398414
table3.Name = "table-1-many";
399-
CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount);
415+
auto [shards, tableId] = CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount);
400416

401-
auto shards = GetTableShards(Server, Sender, "/Root/table-1-many");
417+
table3.TableId = tableId;
402418
table3.TabletId = shards.at(0);
403419

404420
auto [tables, ownerId] = GetTables(Server, table3.TabletId);
405421
table3.OwnerId = ownerId;
406422
table3.UserTable = tables["table-1-many"];
407423

408424
table3.ClientId = runtime.ConnectToPipe(table3.TabletId, Sender, 0, GetTestPipeConfig());
425+
426+
table3.Columns = GetColumns();
409427
}
410428
}
411429

@@ -717,6 +735,30 @@ struct TTestHelper {
717735
UNIT_ASSERT_VALUES_EQUAL(rowsRead, Min(rowCount, limit));
718736
}
719737

738+
NKikimrDataEvents::TEvWriteResult WriteRow(const TString& tableName, ui64 txId, const TVector<ui32>& values, NKikimrDataEvents::TEvWrite::ETxMode txMode = NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) {
739+
const auto& table = Tables[tableName];
740+
741+
auto opts = TShardedTableOptions().Columns(table.Columns);
742+
size_t columnCount = table.Columns.size();
743+
744+
std::vector<ui32> columnIds(columnCount);
745+
std::iota(columnIds.begin(), columnIds.end(), 1);
746+
747+
Y_ABORT_UNLESS(values.size() == columnCount);
748+
749+
TVector<TCell> cells;
750+
for (ui32 col = 0; col < columnCount; ++col)
751+
cells.emplace_back(TCell((const char*)&values[col], sizeof(ui32)));
752+
753+
TSerializedCellMatrix matrix(cells, 1, columnCount);
754+
755+
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
756+
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
757+
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, table.TableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
758+
759+
return Write(*Server->GetRuntime(), Sender, table.TabletId, std::move(evWrite));
760+
}
761+
720762
struct THangedReturn {
721763
ui64 LastPlanStep = 0;
722764
TVector<THolder<IEventHandle>> ReadSets;

ydb/core/tx/datashard/datashard_ut_write.cpp

+4-7
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
3737

3838
const ui32 rowCount = 3;
3939
ui64 txId = 100;
40-
Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
40+
Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
4141

4242
auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
4343

@@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
5555

5656
const ui32 rowCount = 3;
5757
ui64 txId = 100;
58-
Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
58+
Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
5959

6060
auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
6161

@@ -77,11 +77,8 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
7777
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
7878
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
7979

80-
runtime.SendToPipe(shards[0], sender, evWrite.release(), 0, GetPipeConfigWithRetries());
81-
auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
80+
const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
8281

83-
const auto& record = ev->Get()->Record;
84-
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
8582
UNIT_ASSERT_VALUES_EQUAL(record.GetIssues().size(), 1);
8683
UNIT_ASSERT(record.GetIssues(0).message().Contains("Operation [0:100] writes key of 1049601 bytes which exceeds limit 1049600 bytes"));
8784
}
@@ -94,7 +91,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
9491

9592
const ui32 rowCount = 3;
9693
ui64 txId = 100;
97-
Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
94+
Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
9895

9996
auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
10097

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

+11-5
Original file line numberDiff line numberDiff line change
@@ -1843,13 +1843,13 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKik
18431843
return evWrite;
18441844
}
18451845

1846-
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
1846+
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
18471847
{
1848-
auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
1848+
auto txMode = request->Record.GetTxMode();
18491849
runtime.SendToPipe(shardId, sender, request.release(), 0, GetPipeConfigWithRetries());
18501850

18511851
auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
1852-
auto status = ev->Get()->Record.GetStatus();
1852+
auto resultRecord = ev->Get()->Record;
18531853

18541854
if (expectedStatus == NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED) {
18551855
switch (txMode) {
@@ -1866,9 +1866,15 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId
18661866
break;
18671867
}
18681868
}
1869-
UNIT_ASSERT_C(status == expectedStatus, "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues());
1869+
UNIT_ASSERT_C(resultRecord.GetStatus() == expectedStatus, "Status: " << resultRecord.GetStatus() << " Issues: " << resultRecord.GetIssues());
18701870

1871-
return ev->Get()->Record;
1871+
return resultRecord;
1872+
}
1873+
1874+
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
1875+
{
1876+
auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
1877+
return Write(runtime, sender, shardId, std::move(request), expectedStatus);
18721878
}
18731879

18741880
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)

ydb/core/tx/datashard/ut_common/datashard_ut_common.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,8 @@ void ExecSQL(Tests::TServer::TPtr server,
710710
bool dml = true,
711711
Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS);
712712

713-
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
713+
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
714+
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
714715

715716
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);
716717

0 commit comments

Comments
 (0)