Skip to content

Forgotten OwnerId in EvWrite #945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,9 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);

const ui64 ownerId = 0;
const ui64 tableId = 1;
const ui64 schemaVersion = 1;
const std::vector<std::pair<TString, TTypeInfo>> schema = {
{"key", TTypeInfo(NTypeIds::Uint64) },
{"field", TTypeInfo(NTypeIds::Utf8) }
Expand All @@ -1583,7 +1585,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, tableId, 1, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);

TActorId sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
Expand Down Expand Up @@ -1612,7 +1614,9 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);

const ui64 ownerId = 0;
const ui64 tableId = 1;
const ui64 schemaVersion = 1;
const std::vector<std::pair<TString, TTypeInfo>> schema = {
{"key", TTypeInfo(NTypeIds::Uint64) },
{"field", TTypeInfo(NTypeIds::Utf8) }
Expand All @@ -1631,7 +1635,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, tableId, 1, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);

TActorId sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
Expand All @@ -1657,7 +1661,9 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);

const ui64 ownerId = 0;
const ui64 tableId = 1;
const ui64 schemaVersion = 1;
const std::vector<std::pair<TString, TTypeInfo>> schema = {
{"key", TTypeInfo(NTypeIds::Uint64) },
{"field", TTypeInfo(NTypeIds::Utf8) }
Expand All @@ -1676,7 +1682,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, tableId, 1, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);

TActorId sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ Y_UNIT_TEST_SUITE(Normalizers) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);

const ui64 ownerId = 0;
const ui64 tableId = 1;
const ui64 schemaVersion = 1;
const std::vector<std::pair<TString, TTypeInfo>> schema = {
{"key1", TTypeInfo(NTypeIds::Uint64) },
{"key2", TTypeInfo(NTypeIds::Uint64) },
Expand All @@ -245,7 +247,7 @@ Y_UNIT_TEST_SUITE(Normalizers) {

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, tableId, 1, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);

TActorId sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/data_events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <library/cpp/lwtrace/shuttle.h>

#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/protos/data_events.pb.h>
#include <ydb/core/base/events.h>

Expand Down Expand Up @@ -44,8 +45,7 @@ struct TDataEvents {
Record.SetTxMode(txMode);
}

void AddOperation(NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, ui64 tableId,
ui64 schemaVersion, const std::vector<ui32>& columnIds,
void AddOperation(NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const std::vector<ui32>& columnIds,
ui64 payloadIndex, NKikimrDataEvents::EDataFormat payloadFormat) {
Y_ABORT_UNLESS(operationType != NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UNSPECIFIED);
Y_ABORT_UNLESS(payloadFormat != NKikimrDataEvents::FORMAT_UNSPECIFIED);
Expand All @@ -54,8 +54,9 @@ struct TDataEvents {
operation->SetType(operationType);
operation->SetPayloadFormat(payloadFormat);
operation->SetPayloadIndex(payloadIndex);
operation->MutableTableId()->SetTableId(tableId);
operation->MutableTableId()->SetSchemaVersion(schemaVersion);
operation->MutableTableId()->SetOwnerId(tableId.PathId.OwnerId);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вот эта строка была забыта.
Для удобства в самом методе AddOperation проброшен TTableId целиком.

operation->MutableTableId()->SetTableId(tableId.PathId.LocalPathId);
operation->MutableTableId()->SetSchemaVersion(tableId.SchemaVersion);
operation->MutableColumnIds()->Assign(columnIds.begin(), columnIds.end());
}

Expand Down
19 changes: 7 additions & 12 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TVector<TShardedTableOptions::TColumn> GetMoviesColumns() {
return columns;
}

std::tuple<TVector<ui64>, ui64> CreateTable(Tests::TServer::TPtr server,
std::tuple<TVector<ui64>, TTableId> CreateTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name,
Expand All @@ -64,7 +64,7 @@ std::tuple<TVector<ui64>, ui64> CreateTable(Tests::TServer::TPtr server,
return CreateShardedTable(server, sender, root, name, opts);
}

std::tuple<TVector<ui64>, ui64> CreateMoviesTable(Tests::TServer::TPtr server,
std::tuple<TVector<ui64>, TTableId> CreateMoviesTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name)
Expand Down Expand Up @@ -317,9 +317,8 @@ void AddRangeQuery(
struct TTableInfo {
TString Name;

ui64 TableId;
TTableId TableId;
ui64 TabletId;
ui64 OwnerId;
NKikimrTxDataShard::TEvGetInfoResponse::TUserTable UserTable;

TActorId ClientId;
Expand Down Expand Up @@ -376,7 +375,6 @@ struct TTestHelper {
table1.TabletId = shards.at(0);

auto [tables, ownerId] = GetTables(Server, table1.TabletId);
table1.OwnerId = ownerId;
table1.UserTable = tables["table-1"];

table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetTestPipeConfig());
Expand All @@ -401,7 +399,6 @@ struct TTestHelper {
table2.TabletId = shards.at(0);

auto [tables, ownerId] = GetTables(Server, table2.TabletId);
table2.OwnerId = ownerId;
table2.UserTable = tables["movies"];

table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetTestPipeConfig());
Expand All @@ -418,7 +415,6 @@ struct TTestHelper {
table3.TabletId = shards.at(0);

auto [tables, ownerId] = GetTables(Server, table3.TabletId);
table3.OwnerId = ownerId;
table3.UserTable = tables["table-1-many"];

table3.ClientId = runtime.ConnectToPipe(table3.TabletId, Sender, 0, GetTestPipeConfig());
Expand Down Expand Up @@ -508,7 +504,7 @@ struct TTestHelper {
auto& record = request->Record;

record.SetReadId(readId);
record.MutableTableId()->SetOwnerId(table.OwnerId);
record.MutableTableId()->SetOwnerId(table.TableId.PathId.OwnerId);
record.MutableTableId()->SetTableId(table.UserTable.GetPathId());

const auto& description = table.UserTable.GetDescription();
Expand Down Expand Up @@ -764,7 +760,7 @@ struct TTestHelper {

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, table.TableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, table.TableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

return Write(*Server->GetRuntime(), Sender, table.TabletId, std::move(evWrite));
}
Expand Down Expand Up @@ -3618,13 +3614,12 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) {
.Columns({
{"key", "Uint32", true, false},
{"value", "Uint32", false, false}});
CreateShardedTable(server, sender, "/Root", "table-1", opts);
auto [shards, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", opts);

ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6)"));
SimulateSleep(runtime, TDuration::Seconds(1));

const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
const auto shard1 = shards.at(0);
CompactTable(runtime, shard1, tableId1, false);
RebootTablet(runtime, shard1, sender);
SimulateSleep(runtime, TDuration::Seconds(1));
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/tx/datashard/datashard_ut_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) {

InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1", 1);
const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
auto [shards, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1);
ui64 shard1 = shards.at(0);

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)");

Expand Down Expand Up @@ -113,7 +112,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetIndexSize(), 54u);
}

Write(runtime, sender, shard1, tableId1.PathId.LocalPathId, TShardedTableOptions().Columns_, 1, 100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
Write(runtime, sender, shard1, tableId1, TShardedTableOptions().Columns_, 1, 100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);

{
Cerr << "... waiting for stats after write" << Endl;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) {
}
}

TableId = TTableId(tableIdRecord.ownerid(), tableIdRecord.GetTableId(), tableIdRecord.GetSchemaVersion());
TableId = TTableId(tableIdRecord.GetOwnerId(), tableIdRecord.GetTableId(), tableIdRecord.GetSchemaVersion());
return true;
}

Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ static ui64 RunSchemeTx(
return ev->Get()->Record.GetTxId();
}

std::tuple<TVector<ui64>, ui64> CreateShardedTable(
std::tuple<TVector<ui64>, TTableId> CreateShardedTable(
Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
Expand Down Expand Up @@ -1224,12 +1224,12 @@ std::tuple<TVector<ui64>, ui64> CreateShardedTable(

TString path = TStringBuilder() << root << "/" << name;
const auto& shards = GetTableShards(server, sender, path);
const ui64 tableId = ResolveTableId(server, sender, path).PathId.LocalPathId;
TTableId tableId = ResolveTableId(server, sender, path);

return {shards, tableId};
}

std::tuple<TVector<ui64>, ui64> CreateShardedTable(
std::tuple<TVector<ui64>, TTableId> CreateShardedTable(
Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
Expand Down Expand Up @@ -1808,7 +1808,7 @@ void ExecSQL(Tests::TServer::TPtr server,
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
}

std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount) {
std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount) {
std::vector<ui32> columnIds(columns.size());
std::iota(columnIds.begin(), columnIds.end(), 1);

Expand Down Expand Up @@ -1839,7 +1839,7 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKik

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

return evWrite;
}
Expand Down Expand Up @@ -1872,7 +1872,7 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sen
return resultRecord;
}

NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus, NWilson::TTraceId traceId)
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus, NWilson::TTraceId traceId)
{
auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
return Write(runtime, sender, shardId, std::move(request), expectedStatus, std::move(traceId));
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,14 +501,14 @@ struct TShardedTableOptions {
void N(NUnitTest::TTestContext&)

// Create table, returns shards & tableId
std::tuple<TVector<ui64>, ui64> CreateShardedTable(Tests::TServer::TPtr server,
std::tuple<TVector<ui64>, TTableId> CreateShardedTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name,
const TShardedTableOptions &opts = TShardedTableOptions());

// Create table, returns shards & tableId
std::tuple<TVector<ui64>, ui64> CreateShardedTable(Tests::TServer::TPtr server,
std::tuple<TVector<ui64>, TTableId> CreateShardedTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &root,
const TString &name,
Expand Down Expand Up @@ -712,7 +712,7 @@ void ExecSQL(Tests::TServer::TPtr server,
NWilson::TTraceId traceId = {});

NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});

void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);

Expand Down
Loading