Skip to content

Commit db4e909

Browse files
authored
Tests: Replace EvProposeTransaction with EvWrite (#932)
1 parent 3a4b1a7 commit db4e909

8 files changed

+193
-6
lines changed

ydb/core/tx/datashard/datashard__write.cpp

+23
Original file line numberDiff line numberDiff line change
@@ -254,4 +254,27 @@ ui64 EvWrite::Convertor::GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode tx
254254
Y_FAIL_S("Unexpected tx mode " << txMode);
255255
}
256256
}
257+
258+
NKikimrDataEvents::TEvWrite::ETxMode EvWrite::Convertor::GetTxMode(ui64 flags) {
259+
if ((flags & TTxFlags::Immediate) && !(flags & TTxFlags::ForceOnline)) {
260+
return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_IMMEDIATE;
261+
}
262+
else if (flags & TTxFlags::VolatilePrepare) {
263+
return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_VOLATILE_PREPARE;
264+
}
265+
else {
266+
return NKikimrDataEvents::TEvWrite::ETxMode::TEvWrite_ETxMode_MODE_PREPARE;
267+
}
268+
}
269+
270+
NKikimrTxDataShard::TEvProposeTransactionResult::EStatus EvWrite::Convertor::GetStatus(NKikimrDataEvents::TEvWriteResult::EStatus status) {
271+
switch (status) {
272+
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED:
273+
return NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE;
274+
case NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED:
275+
return NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED;
276+
default:
277+
return NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
278+
}
279+
}
257280
}

ydb/core/tx/datashard/datashard_pipeline.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1570,7 +1570,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
15701570
Y_ABORT_UNLESS(writeTx);
15711571

15721572
auto badRequest = [&](const TString& error) {
1573-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << "at tablet# " << Self->TabletID(), Self->TabletID());
1573+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << " at tablet# " << Self->TabletID(), Self->TabletID());
15741574
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
15751575
};
15761576

ydb/core/tx/datashard/datashard_ut_write.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,27 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
2929
return {runtime, server, sender};
3030
}
3131

32+
Y_UNIT_TEST_TWIN(Upsert, EvWrite) {
33+
auto [runtime, server, sender] = TestCreateServer();
34+
35+
auto opts = TShardedTableOptions();
36+
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
37+
38+
auto rows = EvWrite ? TEvWriteRows{{{0, 1}}, {{2, 3}}, {{4, 5}}} : TEvWriteRows{};
39+
auto upsertObserver = ReplaceEvProposeTransactionWithEvWrite(runtime, rows);
40+
auto upsertResultObserver = ReplaceEvProposeTransactionResultWithEvWrite(runtime, rows);
41+
42+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 1);"));
43+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 3);"));
44+
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 5);"));
45+
46+
auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
47+
48+
UNIT_ASSERT_VALUES_EQUAL(table1state, "key = 0, value = 1\n"
49+
"key = 2, value = 3\n"
50+
"key = 4, value = 5\n");
51+
}
52+
3253
Y_UNIT_TEST(WriteImmediateOnShard) {
3354
auto [runtime, server, sender] = TestCreateServer();
3455

ydb/core/tx/datashard/datashard_write.h

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/library/actors/core/event.h>
44
#include <ydb/core/protos/data_events.pb.h>
5+
#include <ydb/core/protos/tx_datashard.pb.h>
56

67
#include <util/generic/ptr.h>
78

@@ -13,5 +14,7 @@ class Convertor {
1314
public:
1415
static ui64 GetTxId(const TAutoPtr<IEventHandle>& ev);
1516
static ui64 GetProposeFlags(NKikimrDataEvents::TEvWrite::ETxMode txMode);
17+
static NKikimrDataEvents::TEvWrite::ETxMode GetTxMode(ui64 flags);
18+
static NKikimrTxDataShard::TEvProposeTransactionResult::EStatus GetStatus(NKikimrDataEvents::TEvWriteResult::EStatus status);
1619
};
1720
}

ydb/core/tx/datashard/datashard_write_operation.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) {
6767
auto tableInfoPtr = tableInfos.FindPtr(tableIdRecord.GetTableId());
6868
if (!tableInfoPtr) {
6969
ErrCode = NKikimrTxDataShard::TError::SCHEME_ERROR;
70-
ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist";
70+
ErrStr = TStringBuilder() << "Table '" << tableIdRecord.GetTableId() << "' doesn't exist.";
7171
return false;
7272
}
7373
TableInfo = tableInfoPtr->Get();

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

+103-4
Original file line numberDiff line numberDiff line change
@@ -1875,6 +1875,103 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sen
18751875
return Write(runtime, sender, shardId, std::move(request), expectedStatus, std::move(traceId));
18761876
}
18771877

1878+
1879+
1880+
TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
1881+
if (rows.empty())
1882+
return {};
1883+
1884+
return runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
1885+
if (event->GetTypeRewrite() != TEvDataShard::EvProposeTransaction)
1886+
return;
1887+
1888+
const auto& record = event->Get<TEvDataShard::TEvProposeTransaction>()->Record;
1889+
1890+
if (record.GetTxKind() != NKikimrTxDataShard::TX_KIND_DATA)
1891+
return;
1892+
1893+
// Parse original TEvProposeTransaction
1894+
const TString& txBody = record.GetTxBody();
1895+
NKikimrTxDataShard::TDataTransaction tx;
1896+
Y_VERIFY(tx.ParseFromArray(txBody.data(), txBody.size()));
1897+
1898+
// Construct new EvWrite
1899+
TVector<TCell> cells;
1900+
ui64 tableId = 0;
1901+
ui16 colCount = 0;
1902+
for (const auto& task : tx.GetKqpTransaction().GetTasks()) {
1903+
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
1904+
Y_VERIFY(task.GetMeta().UnpackTo(&meta));
1905+
if (!meta.HasWrites())
1906+
continue;
1907+
1908+
const auto& tableMeta = meta.GetTable();
1909+
Y_VERIFY_S(tableId == 0 || tableId == tableMeta.GetTableId().GetTableId(), "Only writes to one table is supported now");
1910+
tableId = tableMeta.GetTableId().GetTableId();
1911+
const auto& writes = meta.GetWrites();
1912+
Y_VERIFY_S(colCount == 0 || colCount == writes.GetColumns().size(), "Only equal column count is supported now.");
1913+
colCount = writes.GetColumns().size();
1914+
1915+
const auto& row = rows.ProcessNextRow();
1916+
Y_VERIFY(row.Cells.size() == colCount);
1917+
std::copy(row.Cells.begin(), row.Cells.end(), std::back_inserter(cells));
1918+
}
1919+
1920+
if (cells.empty()) {
1921+
Cerr << "TEvProposeTransaction TX_KIND_DATA has no writes.\n";
1922+
return;
1923+
}
1924+
1925+
Cerr << "TEvProposeTransaction TX_KIND_DATA event is observed and will be replaced with EvWrite: " << record.ShortDebugString() << Endl;
1926+
1927+
TSerializedCellMatrix matrix(cells, cells.size() / colCount, colCount);
1928+
TString blobData = matrix.ReleaseBuffer();
1929+
1930+
UNIT_ASSERT(blobData.size() < 8_MB);
1931+
1932+
ui64 txId = record.GetTxId();
1933+
auto txMode = NKikimr::NDataShard::EvWrite::Convertor::GetTxMode(record.GetFlags());
1934+
std::vector<ui32> columnIds(colCount);
1935+
std::iota(columnIds.begin(), columnIds.end(), 1);
1936+
1937+
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
1938+
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
1939+
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
1940+
1941+
// Replace event
1942+
auto handle = new IEventHandle(event->Recipient, event->Sender, evWrite.release(), 0, event->Cookie);
1943+
handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite());
1944+
event.Reset(handle);
1945+
});
1946+
}
1947+
1948+
TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
1949+
if (rows.empty())
1950+
return {};
1951+
1952+
return runtime.AddObserver([&rows](TAutoPtr<IEventHandle>& event) {
1953+
if (event->GetTypeRewrite() != NEvents::TDataEvents::EvWriteResult)
1954+
return;
1955+
1956+
rows.CompleteNextRow();
1957+
1958+
const auto& record = event->Get<NEvents::TDataEvents::TEvWriteResult>()->Record;
1959+
Cerr << "EvWriteResult event is observed and will be replaced with EvProposeTransactionResult: " << record.ShortDebugString() << Endl;
1960+
1961+
// Construct new EvProposeTransactionResult
1962+
ui64 txId = record.GetTxId();
1963+
ui64 origin = record.GetOrigin();
1964+
auto status = NKikimr::NDataShard::EvWrite::Convertor::GetStatus(record.GetStatus());
1965+
1966+
auto evResult = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(NKikimrTxDataShard::TX_KIND_DATA, origin, txId, status);
1967+
1968+
// Replace event
1969+
auto handle = new IEventHandle(event->Recipient, event->Sender, evResult.release(), 0, event->Cookie);
1970+
handle->Rewrite(handle->GetTypeRewrite(), event->GetRecipientRewrite());
1971+
event.Reset(handle);
1972+
});
1973+
}
1974+
18781975
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)
18791976
{
18801977
auto txTypes = std::make_shared<NTxProxy::TUploadTypes>();
@@ -1903,9 +2000,10 @@ void WaitTabletBecomesOffline(TServer::TPtr server, ui64 tabletId)
19032000
struct IsShardStateChange
19042001
{
19052002
IsShardStateChange(ui64 tabletId)
1906-
: TabletId(tabletId)
1907-
{
1908-
}
2003+
:
2004+
TabletId(tabletId)
2005+
{
2006+
}
19092007

19102008
bool operator()(IEventHandle& ev)
19112009
{
@@ -1959,7 +2057,8 @@ namespace {
19592057
, Snapshot(snapshot)
19602058
, Ordered(ordered)
19612059
, State(pause ? EState::PauseWait : EState::Normal)
1962-
{ }
2060+
{
2061+
}
19632062

19642063
void Bootstrap(const TActorContext& ctx) {
19652064
auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();

ydb/core/tx/datashard/ut_common/datashard_ut_common.h

+39
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,45 @@ void ExecSQL(Tests::TServer::TPtr server,
713713
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
714714
NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, NWilson::TTraceId traceId = {});
715715

716+
struct TEvWriteRow {
717+
TEvWriteRow(std::initializer_list<ui32> init) {
718+
for (ui32 value : init) {
719+
Cells.emplace_back(TCell((const char*)&value, sizeof(ui32)));
720+
}
721+
}
722+
723+
std::vector<TCell> Cells;
724+
725+
enum EStatus {
726+
Init,
727+
Processing,
728+
Completed
729+
} Status = Init;
730+
};
731+
class TEvWriteRows : public std::vector<TEvWriteRow> {
732+
public:
733+
TEvWriteRows() = default;
734+
TEvWriteRows(std::initializer_list<TEvWriteRow> init) :
735+
std::vector<TEvWriteRow>(init) { }
736+
737+
const TEvWriteRow& ProcessNextRow() {
738+
auto processedRow = std::find_if(begin(), end(), [](const auto& row) { return row.Status == TEvWriteRow::EStatus::Init; });
739+
Y_VERIFY_S(processedRow != end(), "There should be at least one EvWrite row to process.");
740+
processedRow->Status = TEvWriteRow::EStatus::Processing;
741+
Cerr << "Processing next EvWrite row\n";
742+
return *processedRow;
743+
}
744+
void CompleteNextRow() {
745+
auto processedRow = std::find_if(begin(), end(), [](const auto& row) { return row.Status == TEvWriteRow::EStatus::Processing; });
746+
Y_VERIFY_S(processedRow != end(), "There should be at lest one EvWrite row processing.");
747+
processedRow->Status = TEvWriteRow::EStatus::Completed;
748+
Cerr << "Completed next EvWrite row\n";
749+
}
750+
};
751+
752+
TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows);
753+
TTestActorRuntimeBase::TEventObserverHolder ReplaceEvProposeTransactionResultWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows);
754+
716755
void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);
717756

718757
struct IsTxResultComplete {

ydb/library/actors/testlib/test_runtime.h

+2
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,8 @@ namespace NActors {
319319
TEventObserverHolder& operator=(TEventObserverHolder&& other) noexcept {
320320
if (this != &other)
321321
{
322+
Remove();
323+
322324
List = std::move(other.List);
323325
Iter = std::move(other.Iter);
324326

0 commit comments

Comments
 (0)