Skip to content

ut_read_iterator + EvWrite #742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 67 additions & 25 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/tx_proxy/read_table.h>

#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/data_events/payload_helper.h>

#include <ydb/public/sdk/cpp/client/ydb_result/result.h>

#include <algorithm>
Expand All @@ -25,46 +28,52 @@ namespace {

using TCellVec = std::vector<TCell>;

void CreateTable(Tests::TServer::TPtr server,
TVector<TShardedTableOptions::TColumn> GetColumns() {
TVector<TShardedTableOptions::TColumn> columns = {
{"key1", "Uint32", true, false},
{"key2", "Uint32", true, false},
{"key3", "Uint32", true, false},
{"value", "Uint32", false, false}};

return columns;
}

TVector<TShardedTableOptions::TColumn> GetMoviesColumns() {
TVector<TShardedTableOptions::TColumn> columns = {
{"id", "Uint32", true, false},
{"title", "String", false, false},
{"rating", "Uint32", false, false}};

return columns;
}

std::tuple<TVector<ui64>, ui64> CreateTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name,
bool withFollower = false,
ui64 shardCount = 1)
{
TVector<TShardedTableOptions::TColumn> columns = {
{"key1", "Uint32", true, false},
{"key2", "Uint32", true, false},
{"key3", "Uint32", true, false},
{"value", "Uint32", false, false}
};

auto opts = TShardedTableOptions()
.Shards(shardCount)
.Columns(columns);
.Columns(GetColumns());

if (withFollower)
opts.Followers(1);

CreateShardedTable(server, sender, root, name, opts);
return CreateShardedTable(server, sender, root, name, opts);
}

void CreateMoviesTable(Tests::TServer::TPtr server,
std::tuple<TVector<ui64>, ui64> CreateMoviesTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name)
{
TVector<TShardedTableOptions::TColumn> columns = {
{"id", "Uint32", true, false},
{"title", "String", false, false},
{"rating", "Uint32", false, false}
};

auto opts = TShardedTableOptions()
.Shards(1)
.Columns(columns);
.Columns(GetMoviesColumns());

CreateShardedTable(server, sender, root, name, opts);
return CreateShardedTable(server, sender, root, name, opts);
}

struct TRowWriter : public NArrow::IRowWriter {
Expand Down Expand Up @@ -308,11 +317,14 @@ void AddRangeQuery(
struct TTableInfo {
TString Name;

ui64 TableId;
ui64 TabletId;
ui64 OwnerId;
NKikimrTxDataShard::TEvGetInfoResponse::TUserTable UserTable;

TActorId ClientId;

TVector<TShardedTableOptions::TColumn> Columns;
};

struct TTestHelper {
Expand Down Expand Up @@ -345,7 +357,7 @@ struct TTestHelper {
{
auto& table1 = Tables["table-1"];
table1.Name = "table-1";
CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount);
auto [shards, tableId] = CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount);
ExecSQL(Server, Sender, R"(
UPSERT INTO `/Root/table-1`
(key1, key2, key3, value)
Expand All @@ -360,20 +372,22 @@ struct TTestHelper {
(11, 11, 11, 1111);
)");

auto shards = GetTableShards(Server, Sender, "/Root/table-1");
table1.TableId = tableId;
table1.TabletId = shards.at(0);

auto [tables, ownerId] = GetTables(Server, table1.TabletId);
table1.OwnerId = ownerId;
table1.UserTable = tables["table-1"];

table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetTestPipeConfig());

table1.Columns = GetColumns();
}

{
auto& table2 = Tables["movies"];
table2.Name = "movies";
CreateMoviesTable(Server, Sender, "/Root", "movies");
auto [shards, tableId] = CreateMoviesTable(Server, Sender, "/Root", "movies");
ExecSQL(Server, Sender, R"(
UPSERT INTO `/Root/movies`
(id, title, rating)
Expand All @@ -383,29 +397,33 @@ struct TTestHelper {
(3, "Hard die", 8);
)");

auto shards = GetTableShards(Server, Sender, "/Root/movies");
table2.TableId = tableId;
table2.TabletId = shards.at(0);

auto [tables, ownerId] = GetTables(Server, table2.TabletId);
table2.OwnerId = ownerId;
table2.UserTable = tables["movies"];

table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetTestPipeConfig());

table2.Columns = GetMoviesColumns();
}

{
auto& table3 = Tables["table-1-many"];
table3.Name = "table-1-many";
CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount);
auto [shards, tableId] = CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount);

auto shards = GetTableShards(Server, Sender, "/Root/table-1-many");
table3.TableId = tableId;
table3.TabletId = shards.at(0);

auto [tables, ownerId] = GetTables(Server, table3.TabletId);
table3.OwnerId = ownerId;
table3.UserTable = tables["table-1-many"];

table3.ClientId = runtime.ConnectToPipe(table3.TabletId, Sender, 0, GetTestPipeConfig());

table3.Columns = GetColumns();
}
}

Expand Down Expand Up @@ -717,6 +735,30 @@ struct TTestHelper {
UNIT_ASSERT_VALUES_EQUAL(rowsRead, Min(rowCount, limit));
}

NKikimrDataEvents::TEvWriteResult WriteRow(const TString& tableName, ui64 txId, const TVector<ui32>& values, NKikimrDataEvents::TEvWrite::ETxMode txMode = NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) {
const auto& table = Tables[tableName];

auto opts = TShardedTableOptions().Columns(table.Columns);
size_t columnCount = table.Columns.size();

std::vector<ui32> columnIds(columnCount);
std::iota(columnIds.begin(), columnIds.end(), 1);

Y_ABORT_UNLESS(values.size() == columnCount);

TVector<TCell> cells;
for (ui32 col = 0; col < columnCount; ++col)
cells.emplace_back(TCell((const char*)&values[col], sizeof(ui32)));

TSerializedCellMatrix matrix(cells, 1, columnCount);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, table.TableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

return Write(*Server->GetRuntime(), Sender, table.TabletId, std::move(evWrite));
}

struct THangedReturn {
ui64 LastPlanStep = 0;
TVector<THolder<IEventHandle>> ReadSets;
Expand Down
11 changes: 4 additions & 7 deletions ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {

const ui32 rowCount = 3;
ui64 txId = 100;
Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);

auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();

Expand All @@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {

const ui32 rowCount = 3;
ui64 txId = 100;
Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);

auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();

Expand All @@ -77,11 +77,8 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

runtime.SendToPipe(shards[0], sender, evWrite.release(), 0, GetPipeConfigWithRetries());
auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);

const auto& record = ev->Get()->Record;
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
UNIT_ASSERT_VALUES_EQUAL(record.GetIssues().size(), 1);
UNIT_ASSERT(record.GetIssues(0).message().Contains("Operation [0:100] writes key of 1049601 bytes which exceeds limit 1049600 bytes"));
}
Expand All @@ -94,7 +91,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {

const ui32 rowCount = 3;
ui64 txId = 100;
Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);

auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();

Expand Down
16 changes: 11 additions & 5 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1843,13 +1843,13 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKik
return evWrite;
}

NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
{
auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
auto txMode = request->Record.GetTxMode();
runtime.SendToPipe(shardId, sender, request.release(), 0, GetPipeConfigWithRetries());

auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
auto status = ev->Get()->Record.GetStatus();
auto resultRecord = ev->Get()->Record;

if (expectedStatus == NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED) {
switch (txMode) {
Expand All @@ -1866,9 +1866,15 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId
break;
}
}
UNIT_ASSERT_C(status == expectedStatus, "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues());
UNIT_ASSERT_C(resultRecord.GetStatus() == expectedStatus, "Status: " << resultRecord.GetStatus() << " Issues: " << resultRecord.GetIssues());

return ev->Get()->Record;
return resultRecord;
}

NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
{
auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
return Write(runtime, sender, shardId, std::move(request), expectedStatus);
}

void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/ut_common/datashard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,8 @@ void ExecSQL(Tests::TServer::TPtr server,
bool dml = true,
Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS);

NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);

void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);

Expand Down