From ab840893106036abacdfe3276b3aff57dbae5e38 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 21 Feb 2025 16:12:10 +0000 Subject: [PATCH 1/5] add partition field --- .../backup/impl/local_partition_reader.cpp | 2 +- ydb/core/persqueue/purecalc/purecalc.cpp | 31 ++--- ydb/core/persqueue/purecalc/purecalc.h | 6 + .../replication/service/table_writer_ut.cpp | 24 ++-- .../tx/replication/service/topic_reader.cpp | 2 +- .../replication/service/transfer_writer.cpp | 27 +++- .../service/transfer_writer_ut.cpp | 2 +- ydb/core/tx/replication/service/worker.cpp | 14 +- ydb/core/tx/replication/service/worker.h | 5 +- ydb/tests/functional/transfer/main.cpp | 130 +++++++++++++++--- 10 files changed, 177 insertions(+), 66 deletions(-) diff --git a/ydb/core/backup/impl/local_partition_reader.cpp b/ydb/core/backup/impl/local_partition_reader.cpp index b7a476693ac9..28a1d794ca83 100644 --- a/ydb/core/backup/impl/local_partition_reader.cpp +++ b/ydb/core/backup/impl/local_partition_reader.cpp @@ -139,7 +139,7 @@ class TLocalPartitionReader } SentOffset = gotOffset + 1; - Send(Worker, new TEvWorker::TEvData(ToString(Partition), std::move(records))); + Send(Worker, new TEvWorker::TEvData(Partition, ToString(Partition), std::move(records))); } void Leave(TEvWorker::TEvGone::EStatus status) { diff --git a/ydb/core/persqueue/purecalc/purecalc.cpp b/ydb/core/persqueue/purecalc/purecalc.cpp index 018483c9e488..ca9cde4cac38 100644 --- a/ydb/core/persqueue/purecalc/purecalc.cpp +++ b/ydb/core/persqueue/purecalc/purecalc.cpp @@ -14,13 +14,9 @@ using namespace NKikimr::NMiniKQL; constexpr const char* DataFieldName = "_data"; constexpr const char* OffsetFieldName = "_offset"; +constexpr const char* PartitionFieldName = "_partition"; -constexpr const size_t FieldCount = 2; // Change it when change fields - -struct FieldPositions { - ui64 Data = 0; - ui64 Offset = 0; -}; +constexpr const size_t FieldCount = 3; // Change it when change fields NYT::TNode CreateTypeNode(const TString& fieldType) { @@ -42,6 +38,7 @@ NYT::TNode CreateMessageScheme() { auto structMembers = NYT::TNode::CreateList(); AddField(structMembers, DataFieldName, "String"); AddField(structMembers, OffsetFieldName, "Uint64"); + AddField(structMembers, PartitionFieldName, "Uint32"); return NYT::TNode::CreateList() .Add("StructType") @@ -60,29 +57,21 @@ struct TMessageWrapper { NYql::NUdf::TUnboxedValuePod GetOffset() const { return NYql::NUdf::TUnboxedValuePod(Message.Offset); } + + NYql::NUdf::TUnboxedValuePod GetPartition() const { + return NYql::NUdf::TUnboxedValuePod(Message.Partition); + } }; class TInputConverter { protected: IWorker* Worker_; TPlainContainerCache Cache_; - FieldPositions Position; public: explicit TInputConverter(IWorker* worker) : Worker_(worker) { - const TStructType* structType = worker->GetInputType(); - const ui64 count = structType->GetMembersCount(); - - for (ui64 i = 0; i < count; ++i) { - const auto name = structType->GetMemberName(i); - if (name == DataFieldName) { - Position.Data = i; - } else if (name == OffsetFieldName) { - Position.Offset = i; - } - } } public: @@ -92,8 +81,10 @@ class TInputConverter { result = Cache_.NewArray(holderFactory, static_cast(FieldCount), items); TMessageWrapper wrap {*message}; - items[Position.Data] = wrap.GetData(); - items[Position.Offset] = wrap.GetOffset(); + // lex order by field name + items[0] = wrap.GetData(); + items[1] = wrap.GetOffset(); + items[2] = wrap.GetPartition(); } void ClearCache() { diff --git a/ydb/core/persqueue/purecalc/purecalc.h b/ydb/core/persqueue/purecalc/purecalc.h index 90fe1a21e7cd..80383582d518 100644 --- a/ydb/core/persqueue/purecalc/purecalc.h +++ b/ydb/core/persqueue/purecalc/purecalc.h @@ -11,12 +11,18 @@ struct TMessage { : Data(data) { } + TMessage& WithPartition(ui64 partition) { + Partition = partition; + return *this; + } + TMessage& WithOffset(ui64 offset) { Offset = offset; return *this; } const TString& Data; + ui32 Partition = 0; ui64 Offset = 0; }; diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp index ffac955bbeac..fb69a57e7d5c 100644 --- a/ydb/core/tx/replication/service/table_writer_ut.cpp +++ b/ydb/core/tx/replication/service/table_writer_ut.cpp @@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"))); env.Send(writer, new TEvWorker::TEvHandshake()); - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}})"), TRecord(2, R"({"key":[2], "update":{"value":"20"}})"), TRecord(3, R"({"key":[3], "update":{"value":"30"}})"), @@ -92,7 +92,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"))); env.Send(writer, new TEvWorker::TEvHandshake()); - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"), TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"), TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"), @@ -143,7 +143,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"))); env.Send(writer, new TEvWorker::TEvHandshake()); - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":["1.0"], "update":{"value":"155555555555555.321"}})"), TRecord(2, R"({"key":["2.0"], "update":{"value":"255555555555555.321"}})"), TRecord(3, R"({"key":["3.0"], "update":{"value":"355555555555555.321"}})"), @@ -184,7 +184,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { ui64 order = 1; { - auto ev = env.Send(writer, new TEvWorker::TEvData("TestSource", { + auto ev = env.Send(writer, new TEvWorker::TEvData(0,"TestSource", { TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), TRecord(order++, R"({"key":[3], "update":{"value":"30"}, "ts":[3,0]})"), @@ -203,14 +203,14 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { })); } { - auto ev = env.Send(writer, new TEvWorker::TEvData("TestSource", { + auto ev = env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(order++, R"({"resolved":[10,0]})"), })); UNIT_ASSERT_VALUES_EQUAL(TRowVersion::FromProto(ev->Get()->Record.GetVersion()), TRowVersion(10, 0)); env.GetRuntime().GrabEdgeEvent(env.GetSender()); } - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"), TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"), TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"), @@ -222,12 +222,12 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { {TRowVersion(30, 0), 3}, })); - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[13,0]})"), TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[23,0]})"), })); - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(order++, R"({"resolved":[30,0]})"), })); env.GetRuntime().GrabEdgeEvent(env.GetSender()); @@ -294,7 +294,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto worker = env.GetRuntime().Register(new TMockWorker(writer, env.GetSender())); env.Send(worker, new TEvWorker::TEvHandshake()); - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), TRecord(2, R"({"key":[2], "update":{"value":"20"}, "ts":[11,0]})"), })); @@ -377,7 +377,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto worker = env.GetRuntime().Register(new TMockWorker(writer, env.GetSender())); env.Send(worker, new TEvWorker::TEvHandshake()); - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), TRecord(2, R"({"resolved":[10,0]})"), })); @@ -407,14 +407,14 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"), EWriteMode::Consistent)); env.Send(writer, new TEvWorker::TEvHandshake()); - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), })); env.Send(writer, MakeTxIdResult({ {TRowVersion(10, 0), 1}, })); - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"), TRecord(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), TRecord(4, R"({"resolved":[20,0]})"), diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index 594c1033c617..ccff6eb2be3e 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -61,7 +61,7 @@ class TRemoteTopicReader: public TActor { records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime()); } - Send(Worker, new TEvWorker::TEvData(ToString(result.PartitionId), std::move(records))); + Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records))); } void Handle(TEvYdbProxy::TEvTopicEndPartition::TPtr& ev) { diff --git a/ydb/core/tx/replication/service/transfer_writer.cpp b/ydb/core/tx/replication/service/transfer_writer.cpp index bb2b193e4fb7..3b7359f06434 100644 --- a/ydb/core/tx/replication/service/transfer_writer.cpp +++ b/ydb/core/tx/replication/service/transfer_writer.cpp @@ -551,7 +551,7 @@ class TTransferWriter } if (PendingRecords) { - ProcessData(*PendingRecords); + ProcessData(PendingPartitionId, *PendingRecords); PendingRecords.reset(); } } @@ -584,15 +584,16 @@ class TTransferWriter void HoldHandle(TEvWorker::TEvData::TPtr& ev) { Y_ABORT_UNLESS(!PendingRecords); + PendingPartitionId = ev->Get()->PartitionId; PendingRecords = std::move(ev->Get()->Records); } void Handle(TEvWorker::TEvData::TPtr& ev) { LOG_D("Handle TEvData " << ev->Get()->ToString()); - ProcessData(ev->Get()->Records); + ProcessData(ev->Get()->PartitionId, ev->Get()->Records); } - void ProcessData(const TVector& records) { + void ProcessData(const ui32 partitionId, const TVector& records) { if (!records) { Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::DONE)); return; @@ -602,16 +603,24 @@ class TTransferWriter for (auto& message : records) { NYdb::NTopic::NPurecalc::TMessage input(message.Data); + input.WithPartition(partitionId); input.WithOffset(message.Offset); - auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input})); - while (auto* m = result->Fetch()) { - TableState->AddData(m->Data); + try { + auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input})); + while (auto* m = result->Fetch()) { + TableState->AddData(m->Data); + } + } catch (const yexception& e) { + ProcessingError = TStringBuilder() << "Error transform message: '" << message.Data << "': " << e.what(); + break; } } if (TableState->Flush()) { Become(&TThis::StateWrite); + } else if (ProcessingError) { + LogCritAndLeave(*ProcessingError); } } @@ -636,6 +645,10 @@ class TTransferWriter return LogCritAndLeave(error); } + if (ProcessingError) { + return LogCritAndLeave(*ProcessingError); + } + Send(Worker, new TEvWorker::TEvPoll()); return StartWork(); } @@ -709,8 +722,10 @@ class TTransferWriter TProgramHolder::TPtr ProgramHolder; mutable TMaybe LogPrefix; + mutable TMaybe ProcessingError; std::optional PendingWorker; + ui32 PendingPartitionId = 0; std::optional> PendingRecords; ui32 Attempt = 0; diff --git a/ydb/core/tx/replication/service/transfer_writer_ut.cpp b/ydb/core/tx/replication/service/transfer_writer_ut.cpp index d617bdc3d03a..edb12e26dd44 100644 --- a/ydb/core/tx/replication/service/transfer_writer_ut.cpp +++ b/ydb/core/tx/replication/service/transfer_writer_ut.cpp @@ -51,7 +51,7 @@ Y_UNIT_TEST_SUITE(TransferWriter) { auto writer = env.GetRuntime().Register(CreateTransferWriter(lambda, tablePathId, compiler)); env.Send(writer, new TEvWorker::TEvHandshake()); - env.Send(writer, new TEvWorker::TEvData("TestSource", { + env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { TRecord(1, R"({"key":[1], "update":{"value":"10"}})"), TRecord(2, R"({"key":[2], "update":{"value":"20"}})"), TRecord(3, R"({"key":[3], "update":{"value":"30"}})"), diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index 66ff8d1f838a..e7833c408abd 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -27,14 +27,16 @@ TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant creat { } -TEvWorker::TEvData::TEvData(const TString& source, const TVector& records) - : Source(source) +TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector& records) + : PartitionId(partitionId) + , Source(source) , Records(records) { } -TEvWorker::TEvData::TEvData(const TString& source, TVector&& records) - : Source(source) +TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, TVector&& records) + : PartitionId(partitionId) + , Source(source) , Records(std::move(records)) { } @@ -160,7 +162,7 @@ class TWorker: public TActorBootstrapped { Writer.Registered(); if (InFlightData) { - Send(Writer, new TEvWorker::TEvData(InFlightData->Source, InFlightData->Records)); + Send(Writer, new TEvWorker::TEvData(InFlightData->PartitionId, InFlightData->Source, InFlightData->Records)); } } else { LOG_W("Handshake from unknown actor" @@ -205,7 +207,7 @@ class TWorker: public TActorBootstrapped { } Y_ABORT_UNLESS(!InFlightData); - InFlightData = MakeHolder(ev->Get()->Source, ev->Get()->Records); + InFlightData = MakeHolder(ev->Get()->PartitionId, ev->Get()->Source, ev->Get()->Records); if (Writer) { Send(ev->Forward(Writer)); diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index 6246eb10994a..a3f71b7eb5e5 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -40,11 +40,12 @@ struct TEvWorker { void Out(IOutputStream& out) const; }; + ui32 PartitionId; TString Source; TVector Records; - explicit TEvData(const TString& source, const TVector& records); - explicit TEvData(const TString& source, TVector&& records); + explicit TEvData(ui32 partitionId, const TString& source, const TVector& records); + explicit TEvData(ui32 partitionId, const TString& source, TVector&& records); TString ToString() const override; }; diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp index 9ea3b36bb79d..928331501ae6 100644 --- a/ydb/tests/functional/transfer/main.cpp +++ b/ydb/tests/functional/transfer/main.cpp @@ -42,6 +42,11 @@ bool Checker::Get(const ::Ydb::Value& value) { return value.bool_value(); } +template<> +ui32 Checker::Get(const ::Ydb::Value& value) { + return value.uint32_value(); +} + template<> ui64 Checker::Get(const ::Ydb::Value& value) { return value.uint64_value(); @@ -70,10 +75,15 @@ std::pair> _C(TString&& name, T&& expected) { }; } +struct TMessage { + const char* Message; + std::optional Partition = std::nullopt; +}; + struct TConfig { const char* TableDDL; const char* Lambda; - const char* Message; + const TVector Messages; TVector>> Expectations; }; @@ -106,7 +116,10 @@ struct MainTestCase { { auto res = session.ExecuteQuery(Sprintf(R"( - CREATE TOPIC `%s`; + CREATE TOPIC `%s` + WITH ( + min_active_partitions = 10 + ); )", TopicName.data()), TTxControl::NoTx()).GetValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } @@ -126,20 +139,25 @@ struct MainTestCase { } { - TWriteSessionSettings writeSettings; - writeSettings.Path(TopicName); - writeSettings.DeduplicationEnabled(false); - auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings); + for (const auto& m : Config.Messages) { + TWriteSessionSettings writeSettings; + writeSettings.Path(TopicName); + writeSettings.DeduplicationEnabled(false); + if (m.Partition) { + writeSettings.PartitionId(m.Partition); + } + auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings); - UNIT_ASSERT(writeSession->Write(Config.Message)); - writeSession->Close(TDuration::Seconds(1)); + UNIT_ASSERT(writeSession->Write(m.Message)); + writeSession->Close(TDuration::Seconds(1)); + } } { for (size_t attempt = 20; attempt--; ) { auto res = DoRead(session); Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush; - if (res.first == 1) { + if (res.first == Config.Messages.size()) { const Ydb::ResultSet& proto = res.second; for (size_t i = 0; i < Config.Expectations.size(); ++i) { auto& c = Config.Expectations[i]; @@ -214,7 +232,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "Message-1", + .Messages = {{"Message-1"}}, .Expectations = { _C("Key", ui64(0)), @@ -247,7 +265,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "Message-1", + .Messages = {{"Message-1"}}, .Expectations = { _C("Key", ui64(0)), @@ -256,6 +274,51 @@ Y_UNIT_TEST_SUITE(Transfer) }).Run(); } + Y_UNIT_TEST(Main_ColumnTable_ComplexKey) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Key1 Uint64 NOT NULL, + Key3 Uint64 NOT NULL, + Value1 Utf8, + Key2 Uint64 NOT NULL, + Value2 Utf8, + Key4 Uint64 NOT NULL, + PRIMARY KEY (Key3, Key2, Key1, Key4) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Key1:CAST(1 AS Uint64), + Key2:CAST(2 AS Uint64), + Value2:CAST("value-2" AS Utf8), + Key4:CAST(4 AS Uint64), + Key3:CAST(3 AS Uint64), + Value1:CAST("value-1" AS Utf8), + |> + ]; + }; + )", + + .Messages = {{"Message-1"}}, + + .Expectations = { + _C("Key1", ui64(1)), + _C("Key2", ui64(2)), + _C("Key3", ui64(3)), + _C("Key4", ui64(4)), + _C("Value1", TString("value-1")), + _C("Value2", TString("value-2")), + } + }).Run(); + } + Y_UNIT_TEST(Main_ColumnTable_JsonMessage) { MainTestCase({ @@ -286,12 +349,12 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = R"({ + .Messages = {{R"({ "id": 1, "first_name": "Vasya", "last_name": "Pupkin", "salary": "123" - })", + })"}}, .Expectations = { _C("Id", ui64(1)), @@ -326,7 +389,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "Message-1", + .Messages = {{"Message-1"}}, .Expectations = { _C("Key", ui64(0)), @@ -359,7 +422,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "2025-02-21", + .Messages = {{"2025-02-21"}}, .Expectations = { _C("Key", ui64(0)), @@ -392,7 +455,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "1.23", + .Messages = {{"1.23"}}, .Expectations = { _C("Key", ui64(0)), @@ -425,7 +488,7 @@ Y_UNIT_TEST_SUITE(Transfer) }; )", - .Message = "Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890", + .Messages = {{"Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890"}}, .Expectations = { _C("Key", ui64(0)), @@ -434,5 +497,38 @@ Y_UNIT_TEST_SUITE(Transfer) }).Run(); } + Y_UNIT_TEST(Main_MessageField_Partition) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Partition Uint32 NOT NULL, + Message Utf8, + PRIMARY KEY (Partition) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Partition:CAST($x._partition AS Uint32), + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )", + + .Messages = {{"Message-1", 7}}, + + .Expectations = { + _C("Partition", ui32(7)), + _C("Message", TString("Message-1")), + } + }).Run(); + } + } From 4387e71750a3c0a6768c25257b1f17ab9626e931 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 24 Feb 2025 06:39:15 +0000 Subject: [PATCH 2/5] SeqNo, MessaeGroupId and ProducerId --- .../backup/impl/local_partition_reader.cpp | 2 +- ydb/core/persqueue/purecalc/purecalc.cpp | 27 +++- ydb/core/persqueue/purecalc/purecalc.h | 21 +-- .../replication/service/base_table_writer.cpp | 5 +- .../tx/replication/service/topic_reader.cpp | 2 +- .../replication/service/transfer_writer.cpp | 10 +- ydb/core/tx/replication/service/worker.cpp | 10 +- ydb/core/tx/replication/service/worker.h | 7 +- ydb/core/tx/replication/ydb_proxy/ydb_proxy.h | 9 ++ ydb/tests/functional/transfer/main.cpp | 141 +++++++++++++++++- 10 files changed, 202 insertions(+), 32 deletions(-) diff --git a/ydb/core/backup/impl/local_partition_reader.cpp b/ydb/core/backup/impl/local_partition_reader.cpp index 28a1d794ca83..2a43bb8741e4 100644 --- a/ydb/core/backup/impl/local_partition_reader.cpp +++ b/ydb/core/backup/impl/local_partition_reader.cpp @@ -135,7 +135,7 @@ class TLocalPartitionReader for (auto& result : readResult.GetResult()) { gotOffset = std::max(gotOffset, result.GetOffset()); - records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData()); + records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo()); } SentOffset = gotOffset + 1; diff --git a/ydb/core/persqueue/purecalc/purecalc.cpp b/ydb/core/persqueue/purecalc/purecalc.cpp index ca9cde4cac38..be27c6d34f4c 100644 --- a/ydb/core/persqueue/purecalc/purecalc.cpp +++ b/ydb/core/persqueue/purecalc/purecalc.cpp @@ -13,10 +13,13 @@ using namespace NYql::NUdf; using namespace NKikimr::NMiniKQL; constexpr const char* DataFieldName = "_data"; +constexpr const char* MessageGroupIdFieldName = "_message_group_id"; constexpr const char* OffsetFieldName = "_offset"; constexpr const char* PartitionFieldName = "_partition"; +constexpr const char* ProducerIdFieldName = "_producer_id"; +constexpr const char* SeqNoFieldName = "_seq_no"; -constexpr const size_t FieldCount = 3; // Change it when change fields +constexpr const size_t FieldCount = 6; // Change it when change fields NYT::TNode CreateTypeNode(const TString& fieldType) { @@ -37,8 +40,11 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy NYT::TNode CreateMessageScheme() { auto structMembers = NYT::TNode::CreateList(); AddField(structMembers, DataFieldName, "String"); + AddField(structMembers, MessageGroupIdFieldName, "String"); AddField(structMembers, OffsetFieldName, "Uint64"); AddField(structMembers, PartitionFieldName, "Uint32"); + AddField(structMembers, ProducerIdFieldName, "String"); + AddField(structMembers, SeqNoFieldName, "Uint64"); return NYT::TNode::CreateList() .Add("StructType") @@ -54,6 +60,10 @@ struct TMessageWrapper { return NKikimr::NMiniKQL::MakeString(Message.Data); } + NYql::NUdf::TUnboxedValuePod GetMessageGroupId() const { + return NKikimr::NMiniKQL::MakeString(Message.MessageGroupId); + } + NYql::NUdf::TUnboxedValuePod GetOffset() const { return NYql::NUdf::TUnboxedValuePod(Message.Offset); } @@ -61,6 +71,14 @@ struct TMessageWrapper { NYql::NUdf::TUnboxedValuePod GetPartition() const { return NYql::NUdf::TUnboxedValuePod(Message.Partition); } + + NYql::NUdf::TUnboxedValuePod GetProducerId() const { + return NKikimr::NMiniKQL::MakeString(Message.ProducerId); + } + + NYql::NUdf::TUnboxedValuePod GetSeqNo() const { + return NYql::NUdf::TUnboxedValuePod(Message.SeqNo); + } }; class TInputConverter { @@ -83,8 +101,11 @@ class TInputConverter { TMessageWrapper wrap {*message}; // lex order by field name items[0] = wrap.GetData(); - items[1] = wrap.GetOffset(); - items[2] = wrap.GetPartition(); + items[1] = wrap.GetMessageGroupId(); + items[2] = wrap.GetOffset(); + items[3] = wrap.GetPartition(); + items[4] = wrap.GetProducerId(); + items[5] = wrap.GetSeqNo(); } void ClearCache() { diff --git a/ydb/core/persqueue/purecalc/purecalc.h b/ydb/core/persqueue/purecalc/purecalc.h index 80383582d518..57fdff5e33f7 100644 --- a/ydb/core/persqueue/purecalc/purecalc.h +++ b/ydb/core/persqueue/purecalc/purecalc.h @@ -7,23 +7,12 @@ namespace NYdb::NTopic::NPurecalc { using namespace NYql::NPureCalc; struct TMessage { - TMessage(const TString& data) - : Data(data) { - } - - TMessage& WithPartition(ui64 partition) { - Partition = partition; - return *this; - } - - TMessage& WithOffset(ui64 offset) { - Offset = offset; - return *this; - } - - const TString& Data; - ui32 Partition = 0; + TString Data; + TString MessageGroupId; ui64 Offset = 0; + ui32 Partition = 0; + TString ProducerId; + ui64 SeqNo = 0; }; class TMessageInputSpec: public TInputSpecBase { diff --git a/ydb/core/tx/replication/service/base_table_writer.cpp b/ydb/core/tx/replication/service/base_table_writer.cpp index 7c6c57dfc6e4..b2bcd0ef5a5a 100644 --- a/ydb/core/tx/replication/service/base_table_writer.cpp +++ b/ydb/core/tx/replication/service/base_table_writer.cpp @@ -433,7 +433,10 @@ class TLocalTableWriter TVector records(::Reserve(ev->Get()->Records.size())); TSet versionsWithoutTxId; - for (auto& [offset, data, _] : ev->Get()->Records) { + for (auto& r : ev->Get()->Records) { + auto offset = r.Offset; + auto& data = r.Data; + auto record = Parser->Parse(ev->Get()->Source, offset, std::move(data)); if (Mode == EWriteMode::Consistent) { diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index ccff6eb2be3e..0e14f9324d5d 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -58,7 +58,7 @@ class TRemoteTopicReader: public TActor { for (auto& msg : result.Messages) { Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW); - records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime()); + records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime(), std::move(msg.GetMessageGroupId()), std::move(msg.GetProducerId()), msg.GetSeqNo()); } Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records))); diff --git a/ydb/core/tx/replication/service/transfer_writer.cpp b/ydb/core/tx/replication/service/transfer_writer.cpp index 3b7359f06434..c6c97c42a494 100644 --- a/ydb/core/tx/replication/service/transfer_writer.cpp +++ b/ydb/core/tx/replication/service/transfer_writer.cpp @@ -602,9 +602,13 @@ class TTransferWriter TableState->EnshureDataBatch(); for (auto& message : records) { - NYdb::NTopic::NPurecalc::TMessage input(message.Data); - input.WithPartition(partitionId); - input.WithOffset(message.Offset); + NYdb::NTopic::NPurecalc::TMessage input; + input.Data = std::move(message.Data); + input.MessageGroupId = std::move(message.MessageGroupId); + input.Partition = partitionId; + input.ProducerId = std::move(message.ProducerId); + input.Offset = message.Offset; + input.SeqNo = message.SeqNo; try { auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input})); diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index e7833c408abd..a156b748f615 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -13,17 +13,23 @@ namespace NKikimr::NReplication::NService { -TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime) +TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo) : Offset(offset) , Data(data) , CreateTime(createTime) + , MessageGroupId(messageGroupId) + , ProducerId(producerId) + , SeqNo(seqNo) { } -TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime) +TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo) : Offset(offset) , Data(std::move(data)) , CreateTime(createTime) + , MessageGroupId(std::move(messageGroupId)) + , ProducerId(std::move(producerId)) + , SeqNo(seqNo) { } diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index a3f71b7eb5e5..e03f3b24dc19 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -34,9 +34,12 @@ struct TEvWorker { ui64 Offset; TString Data; TInstant CreateTime; + TString MessageGroupId; + TString ProducerId; + ui64 SeqNo; - explicit TRecord(ui64 offset, const TString& data, TInstant createTime = TInstant::Zero()); - explicit TRecord(ui64 offset, TString&& data, TInstant createTime = TInstant::Zero()); + explicit TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo); + explicit TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo); void Out(IOutputStream& out) const; }; diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h index 6a5801003c24..ec0d8bca62a2 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h @@ -167,6 +167,9 @@ struct TEvYdbProxy { , Data(msg.GetData()) , CreateTime(msg.GetCreateTime()) , Codec(codec) + , MessageGroupId(msg.GetMessageGroupId()) + , ProducerId(msg.GetProducerId()) + , SeqNo(msg.GetSeqNo()) { } @@ -186,6 +189,9 @@ struct TEvYdbProxy { TString& GetData() { return Data; } TInstant GetCreateTime() const { return CreateTime; } ECodec GetCodec() const { return Codec; } + TString& GetMessageGroupId() { return MessageGroupId; } + TString& GetProducerId() { return ProducerId; } + ui64 GetSeqNo() { return SeqNo; } void Out(IOutputStream& out) const; private: @@ -193,6 +199,9 @@ struct TEvYdbProxy { TString Data; TInstant CreateTime; ECodec Codec; + TString MessageGroupId; + TString ProducerId; + ui64 SeqNo; }; explicit TReadTopicResult(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) { diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp index 928331501ae6..6a96711a1acd 100644 --- a/ydb/tests/functional/transfer/main.cpp +++ b/ydb/tests/functional/transfer/main.cpp @@ -76,10 +76,43 @@ std::pair> _C(TString&& name, T&& expected) { } struct TMessage { - const char* Message; + TString Message; std::optional Partition = std::nullopt; + std::optional ProducerId = std::nullopt; + std::optional MessageGroupId = std::nullopt; + std::optional SeqNo = std::nullopt; }; +TMessage _withSeqNo(ui64 seqNo) { + return { + .Message = TStringBuilder() << "Message-" << seqNo, + .Partition = 0, + .ProducerId = std::nullopt, + .MessageGroupId = std::nullopt, + .SeqNo = seqNo + }; +} + +TMessage _withProducerId(const TString& producerId) { + return { + .Message = TStringBuilder() << "Message-" << producerId, + .Partition = 0, + .ProducerId = producerId, + .MessageGroupId = std::nullopt, + .SeqNo = std::nullopt + }; +} + +TMessage _withMessageGroupId(const TString& messageGroupId) { + return { + .Message = TStringBuilder() << "Message-" << messageGroupId, + .Partition = 0, + .ProducerId = messageGroupId, + .MessageGroupId = messageGroupId, + .SeqNo = std::nullopt + }; +} + struct TConfig { const char* TableDDL; const char* Lambda; @@ -142,13 +175,19 @@ struct MainTestCase { for (const auto& m : Config.Messages) { TWriteSessionSettings writeSettings; writeSettings.Path(TopicName); - writeSettings.DeduplicationEnabled(false); + writeSettings.DeduplicationEnabled(m.SeqNo); if (m.Partition) { writeSettings.PartitionId(m.Partition); } + if (m.ProducerId) { + writeSettings.ProducerId(*m.ProducerId); + } + if (m.MessageGroupId) { + writeSettings.MessageGroupId(*m.MessageGroupId); + } auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings); - UNIT_ASSERT(writeSession->Write(m.Message)); + UNIT_ASSERT(writeSession->Write(m.Message, m.SeqNo)); writeSession->Close(TDuration::Seconds(1)); } } @@ -530,5 +569,101 @@ Y_UNIT_TEST_SUITE(Transfer) }).Run(); } + Y_UNIT_TEST(Main_MessageField_SeqNo) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + SeqNo Uint64 NOT NULL, + Message Utf8, + PRIMARY KEY (SeqNo) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + SeqNo:CAST($x._seq_no AS Uint32), + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )", + + .Messages = {_withSeqNo(13)}, + + .Expectations = { + _C("SeqNo", ui64(13)), + } + }).Run(); + } + + Y_UNIT_TEST(Main_MessageField_ProducerId) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Offset Uint64 NOT NULL, + ProducerId Utf8, + PRIMARY KEY (Offset) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Offset:CAST($x._offset AS Uint64), + ProducerId:CAST($x._producer_id AS Utf8) + |> + ]; + }; + )", + + .Messages = {_withProducerId("Producer-13")}, + + .Expectations = { + _C("ProducerId", TString("Producer-13")), + } + }).Run(); + } + + Y_UNIT_TEST(Main_MessageField_MessageGroupId) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Offset Uint64 NOT NULL, + MessageGroupId Utf8, + PRIMARY KEY (Offset) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Offset:CAST($x._offset AS Uint64), + MessageGroupId:CAST($x._message_group_id AS Utf8) + |> + ]; + }; + )", + + .Messages = {_withMessageGroupId("MessageGroupId-13")}, + + .Expectations = { + _C("MessageGroupId", TString("MessageGroupId-13")), + } + }).Run(); + } + } From afbaa9752e14f2ed50a6dfb3340f8c10fa462937 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 24 Feb 2025 06:58:15 +0000 Subject: [PATCH 3/5] fix test --- .../replication/service/table_writer_ut.cpp | 116 +++++++++--------- .../service/transfer_writer_ut.cpp | 10 +- 2 files changed, 67 insertions(+), 59 deletions(-) diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp index fb69a57e7d5c..60e8340488f2 100644 --- a/ydb/core/tx/replication/service/table_writer_ut.cpp +++ b/ydb/core/tx/replication/service/table_writer_ut.cpp @@ -18,6 +18,10 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { using namespace NTestHelpers; using TRecord = TEvWorker::TEvData::TRecord; + TRecord Record(ui64 offset, const TString& data) { + return TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */); + } + Y_UNIT_TEST(WriteTable) { TEnv env; env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG); @@ -36,9 +40,9 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(1, R"({"key":[1], "update":{"value":"10"}})"), - TRecord(2, R"({"key":[2], "update":{"value":"20"}})"), - TRecord(3, R"({"key":[3], "update":{"value":"30"}})"), + Record(1, R"({"key":[1], "update":{"value":"10"}})"), + Record(2, R"({"key":[2], "update":{"value":"20"}})"), + Record(3, R"({"key":[3], "update":{"value":"30"}})"), })); } @@ -93,37 +97,37 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"), - TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"), - TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"), - TRecord(4, R"({"key":[4], "update":{"uint64_value":200500}})"), - TRecord(5, R"({"key":[5], "update":{"uint8_value":255}})"), - TRecord(6, R"({"key":[6], "update":{"bool_value":true}})"), - TRecord(7, R"({"key":[7], "update":{"double_value":1.1234}})"), - TRecord(8, R"({"key":[8], "update":{"float_value":-1.123}})"), - TRecord(9, R"({"key":[9], "update":{"date_value":"2020-08-12T00:00:00.000000Z"}})"), - TRecord(10, R"({"key":[10], "update":{"datetime_value":"2020-08-12T12:34:56.000000Z"}})"), - TRecord(11, R"({"key":[11], "update":{"timestamp_value":"2020-08-12T12:34:56.123456Z"}})"), - TRecord(12, R"({"key":[12], "update":{"interval_value":-300500}})"), - TRecord(13, R"({"key":[13], "update":{"decimal_value":"3.321"}})"), - TRecord(14, R"({"key":[14], "update":{"dynumber_value":".3321e1"}})"), - TRecord(15, Sprintf(R"({"key":[15], "update":{"string_value":"%s"}})", Base64Encode("lorem ipsum").c_str())), - TRecord(16, R"({"key":[16], "update":{"utf8_value":"lorem ipsum"}})"), - TRecord(17, R"({"key":[17], "update":{"json_value":{"key": "value"}}})"), - TRecord(18, R"({"key":[18], "update":{"jsondoc_value":{"key": "value"}}})"), - TRecord(19, R"({"key":[19], "update":{"uuid_value":"65df1ec1-a97d-47b2-ae56-3c023da6ee8c"}})"), - TRecord(20, R"({"key":[20], "update":{"date32_value":18486}})"), - TRecord(21, R"({"key":[21], "update":{"datetime64_value":1597235696}})"), - TRecord(22, R"({"key":[22], "update":{"timestamp64_value":1597235696123456}})"), - TRecord(23, R"({"key":[23], "update":{"interval64_value":-300500}})"), - TRecord(24, R"({"key":[24], "update":{"pgint2_value":"-42"}})"), - TRecord(25, R"({"key":[25], "update":{"pgint4_value":"-420"}})"), - TRecord(26, R"({"key":[26], "update":{"pgint8_value":"-4200"}})"), - TRecord(27, R"({"key":[27], "update":{"pgfloat4_value":"3.1415"}})"), - TRecord(28, R"({"key":[28], "update":{"pgfloat8_value":"2.718"}})"), - TRecord(29, R"({"key":[29], "update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})"), - TRecord(30, R"({"key":[30], "update":{"pgtext_value":"lorem \"ipsum\""}})"), - TRecord(31, R"({"key":[31], "update":{"decimal35_value":"355555555555555.321"}})"), + Record(1, R"({"key":[1], "update":{"int32_value":-100500}})"), + Record(2, R"({"key":[2], "update":{"uint32_value":100500}})"), + Record(3, R"({"key":[3], "update":{"int64_value":-200500}})"), + Record(4, R"({"key":[4], "update":{"uint64_value":200500}})"), + Record(5, R"({"key":[5], "update":{"uint8_value":255}})"), + Record(6, R"({"key":[6], "update":{"bool_value":true}})"), + Record(7, R"({"key":[7], "update":{"double_value":1.1234}})"), + Record(8, R"({"key":[8], "update":{"float_value":-1.123}})"), + Record(9, R"({"key":[9], "update":{"date_value":"2020-08-12T00:00:00.000000Z"}})"), + Record(10, R"({"key":[10], "update":{"datetime_value":"2020-08-12T12:34:56.000000Z"}})"), + Record(11, R"({"key":[11], "update":{"timestamp_value":"2020-08-12T12:34:56.123456Z"}})"), + Record(12, R"({"key":[12], "update":{"interval_value":-300500}})"), + Record(13, R"({"key":[13], "update":{"decimal_value":"3.321"}})"), + Record(14, R"({"key":[14], "update":{"dynumber_value":".3321e1"}})"), + Record(15, Sprintf(R"({"key":[15], "update":{"string_value":"%s"}})", Base64Encode("lorem ipsum").c_str())), + Record(16, R"({"key":[16], "update":{"utf8_value":"lorem ipsum"}})"), + Record(17, R"({"key":[17], "update":{"json_value":{"key": "value"}}})"), + Record(18, R"({"key":[18], "update":{"jsondoc_value":{"key": "value"}}})"), + Record(19, R"({"key":[19], "update":{"uuid_value":"65df1ec1-a97d-47b2-ae56-3c023da6ee8c"}})"), + Record(20, R"({"key":[20], "update":{"date32_value":18486}})"), + Record(21, R"({"key":[21], "update":{"datetime64_value":1597235696}})"), + Record(22, R"({"key":[22], "update":{"timestamp64_value":1597235696123456}})"), + Record(23, R"({"key":[23], "update":{"interval64_value":-300500}})"), + Record(24, R"({"key":[24], "update":{"pgint2_value":"-42"}})"), + Record(25, R"({"key":[25], "update":{"pgint4_value":"-420"}})"), + Record(26, R"({"key":[26], "update":{"pgint8_value":"-4200"}})"), + Record(27, R"({"key":[27], "update":{"pgfloat4_value":"3.1415"}})"), + Record(28, R"({"key":[28], "update":{"pgfloat8_value":"2.718"}})"), + Record(29, R"({"key":[29], "update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})"), + Record(30, R"({"key":[30], "update":{"pgtext_value":"lorem \"ipsum\""}})"), + Record(31, R"({"key":[31], "update":{"decimal35_value":"355555555555555.321"}})"), })); } @@ -144,9 +148,9 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(1, R"({"key":["1.0"], "update":{"value":"155555555555555.321"}})"), - TRecord(2, R"({"key":["2.0"], "update":{"value":"255555555555555.321"}})"), - TRecord(3, R"({"key":["3.0"], "update":{"value":"355555555555555.321"}})"), + Record(1, R"({"key":["1.0"], "update":{"value":"155555555555555.321"}})"), + Record(2, R"({"key":["2.0"], "update":{"value":"255555555555555.321"}})"), + Record(3, R"({"key":["3.0"], "update":{"value":"355555555555555.321"}})"), })); } @@ -185,9 +189,9 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { { auto ev = env.Send(writer, new TEvWorker::TEvData(0,"TestSource", { - TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), - TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), - TRecord(order++, R"({"key":[3], "update":{"value":"30"}, "ts":[3,0]})"), + Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), + Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), + Record(order++, R"({"key":[3], "update":{"value":"30"}, "ts":[3,0]})"), })); const auto& versions = ev->Get()->Record.GetVersions(); @@ -204,17 +208,17 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { } { auto ev = env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(order++, R"({"resolved":[10,0]})"), + Record(order++, R"({"resolved":[10,0]})"), })); UNIT_ASSERT_VALUES_EQUAL(TRowVersion::FromProto(ev->Get()->Record.GetVersion()), TRowVersion(10, 0)); env.GetRuntime().GrabEdgeEvent(env.GetSender()); } env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"), - TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"), - TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"), - TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[22,0]})"), + Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"), + Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"), + Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"), + Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[22,0]})"), })); env.Send(writer, MakeTxIdResult({ @@ -223,12 +227,12 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { })); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[13,0]})"), - TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[23,0]})"), + Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[13,0]})"), + Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[23,0]})"), })); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(order++, R"({"resolved":[30,0]})"), + Record(order++, R"({"resolved":[30,0]})"), })); env.GetRuntime().GrabEdgeEvent(env.GetSender()); } @@ -295,8 +299,8 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(worker, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), - TRecord(2, R"({"key":[2], "update":{"value":"20"}, "ts":[11,0]})"), + Record(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), + Record(2, R"({"key":[2], "update":{"value":"20"}, "ts":[11,0]})"), })); env.SendAsync(writer, MakeTxIdResult({ {TRowVersion(10, 0), 1}, @@ -378,8 +382,8 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(worker, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), - TRecord(2, R"({"resolved":[10,0]})"), + Record(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), + Record(2, R"({"resolved":[10,0]})"), })); env.Send(writer, MakeTxIdResult({ {TRowVersion(10, 0), 1}, @@ -408,16 +412,16 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), + Record(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), })); env.Send(writer, MakeTxIdResult({ {TRowVersion(10, 0), 1}, })); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"), - TRecord(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), - TRecord(4, R"({"resolved":[20,0]})"), + Record(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"), + Record(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), + Record(4, R"({"resolved":[20,0]})"), })); env.Send(writer, MakeTxIdResult({ {TRowVersion(20, 0), 2}, diff --git a/ydb/core/tx/replication/service/transfer_writer_ut.cpp b/ydb/core/tx/replication/service/transfer_writer_ut.cpp index edb12e26dd44..b83859e3bfc9 100644 --- a/ydb/core/tx/replication/service/transfer_writer_ut.cpp +++ b/ydb/core/tx/replication/service/transfer_writer_ut.cpp @@ -19,6 +19,10 @@ Y_UNIT_TEST_SUITE(TransferWriter) { using namespace NTestHelpers; using TRecord = TEvWorker::TEvData::TRecord; + TRecord Record(ui64 offset, const TString& data) { + return TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */); + } + Y_UNIT_TEST(Write_ColumnTable) { TEnv env; env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG); @@ -52,9 +56,9 @@ Y_UNIT_TEST_SUITE(TransferWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - TRecord(1, R"({"key":[1], "update":{"value":"10"}})"), - TRecord(2, R"({"key":[2], "update":{"value":"20"}})"), - TRecord(3, R"({"key":[3], "update":{"value":"30"}})"), + Record(1, R"({"key":[1], "update":{"value":"10"}})"), + Record(2, R"({"key":[2], "update":{"value":"20"}})"), + Record(3, R"({"key":[3], "update":{"value":"30"}})"), })); } From aaace33671a44480a9edff5ac83b129f43e88df9 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 24 Feb 2025 11:24:57 +0000 Subject: [PATCH 4/5] fix --- .../replication/service/table_writer_ut.cpp | 117 +++++++++--------- .../service/transfer_writer_ut.cpp | 11 +- 2 files changed, 63 insertions(+), 65 deletions(-) diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp index 60e8340488f2..7c2e8711861a 100644 --- a/ydb/core/tx/replication/service/table_writer_ut.cpp +++ b/ydb/core/tx/replication/service/table_writer_ut.cpp @@ -16,10 +16,9 @@ namespace NKikimr::NReplication::NService { Y_UNIT_TEST_SUITE(LocalTableWriter) { using namespace NTestHelpers; - using TRecord = TEvWorker::TEvData::TRecord; - TRecord Record(ui64 offset, const TString& data) { - return TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */); + TEvWorker::TEvData::TRecord TRecord(ui64 offset, const TString& data) { + return TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */); } Y_UNIT_TEST(WriteTable) { @@ -40,9 +39,9 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(1, R"({"key":[1], "update":{"value":"10"}})"), - Record(2, R"({"key":[2], "update":{"value":"20"}})"), - Record(3, R"({"key":[3], "update":{"value":"30"}})"), + TRecord(1, R"({"key":[1], "update":{"value":"10"}})"), + TRecord(2, R"({"key":[2], "update":{"value":"20"}})"), + TRecord(3, R"({"key":[3], "update":{"value":"30"}})"), })); } @@ -97,37 +96,37 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(1, R"({"key":[1], "update":{"int32_value":-100500}})"), - Record(2, R"({"key":[2], "update":{"uint32_value":100500}})"), - Record(3, R"({"key":[3], "update":{"int64_value":-200500}})"), - Record(4, R"({"key":[4], "update":{"uint64_value":200500}})"), - Record(5, R"({"key":[5], "update":{"uint8_value":255}})"), - Record(6, R"({"key":[6], "update":{"bool_value":true}})"), - Record(7, R"({"key":[7], "update":{"double_value":1.1234}})"), - Record(8, R"({"key":[8], "update":{"float_value":-1.123}})"), - Record(9, R"({"key":[9], "update":{"date_value":"2020-08-12T00:00:00.000000Z"}})"), - Record(10, R"({"key":[10], "update":{"datetime_value":"2020-08-12T12:34:56.000000Z"}})"), - Record(11, R"({"key":[11], "update":{"timestamp_value":"2020-08-12T12:34:56.123456Z"}})"), - Record(12, R"({"key":[12], "update":{"interval_value":-300500}})"), - Record(13, R"({"key":[13], "update":{"decimal_value":"3.321"}})"), - Record(14, R"({"key":[14], "update":{"dynumber_value":".3321e1"}})"), - Record(15, Sprintf(R"({"key":[15], "update":{"string_value":"%s"}})", Base64Encode("lorem ipsum").c_str())), - Record(16, R"({"key":[16], "update":{"utf8_value":"lorem ipsum"}})"), - Record(17, R"({"key":[17], "update":{"json_value":{"key": "value"}}})"), - Record(18, R"({"key":[18], "update":{"jsondoc_value":{"key": "value"}}})"), - Record(19, R"({"key":[19], "update":{"uuid_value":"65df1ec1-a97d-47b2-ae56-3c023da6ee8c"}})"), - Record(20, R"({"key":[20], "update":{"date32_value":18486}})"), - Record(21, R"({"key":[21], "update":{"datetime64_value":1597235696}})"), - Record(22, R"({"key":[22], "update":{"timestamp64_value":1597235696123456}})"), - Record(23, R"({"key":[23], "update":{"interval64_value":-300500}})"), - Record(24, R"({"key":[24], "update":{"pgint2_value":"-42"}})"), - Record(25, R"({"key":[25], "update":{"pgint4_value":"-420"}})"), - Record(26, R"({"key":[26], "update":{"pgint8_value":"-4200"}})"), - Record(27, R"({"key":[27], "update":{"pgfloat4_value":"3.1415"}})"), - Record(28, R"({"key":[28], "update":{"pgfloat8_value":"2.718"}})"), - Record(29, R"({"key":[29], "update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})"), - Record(30, R"({"key":[30], "update":{"pgtext_value":"lorem \"ipsum\""}})"), - Record(31, R"({"key":[31], "update":{"decimal35_value":"355555555555555.321"}})"), + TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"), + TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"), + TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"), + TRecord(4, R"({"key":[4], "update":{"uint64_value":200500}})"), + TRecord(5, R"({"key":[5], "update":{"uint8_value":255}})"), + TRecord(6, R"({"key":[6], "update":{"bool_value":true}})"), + TRecord(7, R"({"key":[7], "update":{"double_value":1.1234}})"), + TRecord(8, R"({"key":[8], "update":{"float_value":-1.123}})"), + TRecord(9, R"({"key":[9], "update":{"date_value":"2020-08-12T00:00:00.000000Z"}})"), + TRecord(10, R"({"key":[10], "update":{"datetime_value":"2020-08-12T12:34:56.000000Z"}})"), + TRecord(11, R"({"key":[11], "update":{"timestamp_value":"2020-08-12T12:34:56.123456Z"}})"), + TRecord(12, R"({"key":[12], "update":{"interval_value":-300500}})"), + TRecord(13, R"({"key":[13], "update":{"decimal_value":"3.321"}})"), + TRecord(14, R"({"key":[14], "update":{"dynumber_value":".3321e1"}})"), + TRecord(15, Sprintf(R"({"key":[15], "update":{"string_value":"%s"}})", Base64Encode("lorem ipsum").c_str())), + TRecord(16, R"({"key":[16], "update":{"utf8_value":"lorem ipsum"}})"), + TRecord(17, R"({"key":[17], "update":{"json_value":{"key": "value"}}})"), + TRecord(18, R"({"key":[18], "update":{"jsondoc_value":{"key": "value"}}})"), + TRecord(19, R"({"key":[19], "update":{"uuid_value":"65df1ec1-a97d-47b2-ae56-3c023da6ee8c"}})"), + TRecord(20, R"({"key":[20], "update":{"date32_value":18486}})"), + TRecord(21, R"({"key":[21], "update":{"datetime64_value":1597235696}})"), + TRecord(22, R"({"key":[22], "update":{"timestamp64_value":1597235696123456}})"), + TRecord(23, R"({"key":[23], "update":{"interval64_value":-300500}})"), + TRecord(24, R"({"key":[24], "update":{"pgint2_value":"-42"}})"), + TRecord(25, R"({"key":[25], "update":{"pgint4_value":"-420"}})"), + TRecord(26, R"({"key":[26], "update":{"pgint8_value":"-4200"}})"), + TRecord(27, R"({"key":[27], "update":{"pgfloat4_value":"3.1415"}})"), + TRecord(28, R"({"key":[28], "update":{"pgfloat8_value":"2.718"}})"), + TRecord(29, R"({"key":[29], "update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})"), + TRecord(30, R"({"key":[30], "update":{"pgtext_value":"lorem \"ipsum\""}})"), + TRecord(31, R"({"key":[31], "update":{"decimal35_value":"355555555555555.321"}})"), })); } @@ -148,9 +147,9 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(1, R"({"key":["1.0"], "update":{"value":"155555555555555.321"}})"), - Record(2, R"({"key":["2.0"], "update":{"value":"255555555555555.321"}})"), - Record(3, R"({"key":["3.0"], "update":{"value":"355555555555555.321"}})"), + TRecord(1, R"({"key":["1.0"], "update":{"value":"155555555555555.321"}})"), + TRecord(2, R"({"key":["2.0"], "update":{"value":"255555555555555.321"}})"), + TRecord(3, R"({"key":["3.0"], "update":{"value":"355555555555555.321"}})"), })); } @@ -189,9 +188,9 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { { auto ev = env.Send(writer, new TEvWorker::TEvData(0,"TestSource", { - Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), - Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), - Record(order++, R"({"key":[3], "update":{"value":"30"}, "ts":[3,0]})"), + TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), + TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), + TRecord(order++, R"({"key":[3], "update":{"value":"30"}, "ts":[3,0]})"), })); const auto& versions = ev->Get()->Record.GetVersions(); @@ -208,17 +207,17 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { } { auto ev = env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(order++, R"({"resolved":[10,0]})"), + TRecord(order++, R"({"resolved":[10,0]})"), })); UNIT_ASSERT_VALUES_EQUAL(TRowVersion::FromProto(ev->Get()->Record.GetVersion()), TRowVersion(10, 0)); env.GetRuntime().GrabEdgeEvent(env.GetSender()); } env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"), - Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"), - Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"), - Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[22,0]})"), + TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"), + TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"), + TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"), + TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[22,0]})"), })); env.Send(writer, MakeTxIdResult({ @@ -227,12 +226,12 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { })); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[13,0]})"), - Record(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[23,0]})"), + TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[13,0]})"), + TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[23,0]})"), })); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(order++, R"({"resolved":[30,0]})"), + TRecord(order++, R"({"resolved":[30,0]})"), })); env.GetRuntime().GrabEdgeEvent(env.GetSender()); } @@ -299,8 +298,8 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(worker, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), - Record(2, R"({"key":[2], "update":{"value":"20"}, "ts":[11,0]})"), + TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), + TRecord(2, R"({"key":[2], "update":{"value":"20"}, "ts":[11,0]})"), })); env.SendAsync(writer, MakeTxIdResult({ {TRowVersion(10, 0), 1}, @@ -382,8 +381,8 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(worker, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), - Record(2, R"({"resolved":[10,0]})"), + TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), + TRecord(2, R"({"resolved":[10,0]})"), })); env.Send(writer, MakeTxIdResult({ {TRowVersion(10, 0), 1}, @@ -412,16 +411,16 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), + TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"), })); env.Send(writer, MakeTxIdResult({ {TRowVersion(10, 0), 1}, })); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"), - Record(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), - Record(4, R"({"resolved":[20,0]})"), + TRecord(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"), + TRecord(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"), + TRecord(4, R"({"resolved":[20,0]})"), })); env.Send(writer, MakeTxIdResult({ {TRowVersion(20, 0), 2}, diff --git a/ydb/core/tx/replication/service/transfer_writer_ut.cpp b/ydb/core/tx/replication/service/transfer_writer_ut.cpp index b83859e3bfc9..df1d01d90c70 100644 --- a/ydb/core/tx/replication/service/transfer_writer_ut.cpp +++ b/ydb/core/tx/replication/service/transfer_writer_ut.cpp @@ -17,10 +17,9 @@ namespace NKikimr::NReplication::NService { Y_UNIT_TEST_SUITE(TransferWriter) { using namespace NTestHelpers; - using TRecord = TEvWorker::TEvData::TRecord; - TRecord Record(ui64 offset, const TString& data) { - return TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */); + TEvWorker::TEvData::TRecord TRecord(ui64 offset, const TString& data) { + return TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */); } Y_UNIT_TEST(Write_ColumnTable) { @@ -56,9 +55,9 @@ Y_UNIT_TEST_SUITE(TransferWriter) { env.Send(writer, new TEvWorker::TEvHandshake()); env.Send(writer, new TEvWorker::TEvData(0, "TestSource", { - Record(1, R"({"key":[1], "update":{"value":"10"}})"), - Record(2, R"({"key":[2], "update":{"value":"20"}})"), - Record(3, R"({"key":[3], "update":{"value":"30"}})"), + TRecord(1, R"({"key":[1], "update":{"value":"10"}})"), + TRecord(2, R"({"key":[2], "update":{"value":"20"}})"), + TRecord(3, R"({"key":[3], "update":{"value":"30"}})"), })); } From 48a28376407ff07caf462912b423f7956efc7cca Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 24 Feb 2025 11:59:09 +0000 Subject: [PATCH 5/5] fix #2 --- ydb/core/tx/replication/service/common_ut.h | 13 +++++++++++++ ydb/core/tx/replication/service/table_writer_ut.cpp | 6 +----- .../tx/replication/service/transfer_writer_ut.cpp | 6 +----- 3 files changed, 15 insertions(+), 10 deletions(-) create mode 100644 ydb/core/tx/replication/service/common_ut.h diff --git a/ydb/core/tx/replication/service/common_ut.h b/ydb/core/tx/replication/service/common_ut.h new file mode 100644 index 000000000000..6d833760f62d --- /dev/null +++ b/ydb/core/tx/replication/service/common_ut.h @@ -0,0 +1,13 @@ +#pragma once + +#include "worker.h" + +namespace NKikimr::NReplication::NService { + +struct TRecord: public TEvWorker::TEvData::TRecord { + explicit TRecord(ui64 offset, const TString& data) + : TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 42) + {} +}; + +} diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp index 7c2e8711861a..362972f73852 100644 --- a/ydb/core/tx/replication/service/table_writer_ut.cpp +++ b/ydb/core/tx/replication/service/table_writer_ut.cpp @@ -1,6 +1,6 @@ #include "service.h" #include "table_writer.h" -#include "worker.h" +#include "common_ut.h" #include #include @@ -17,10 +17,6 @@ namespace NKikimr::NReplication::NService { Y_UNIT_TEST_SUITE(LocalTableWriter) { using namespace NTestHelpers; - TEvWorker::TEvData::TRecord TRecord(ui64 offset, const TString& data) { - return TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */); - } - Y_UNIT_TEST(WriteTable) { TEnv env; env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG); diff --git a/ydb/core/tx/replication/service/transfer_writer_ut.cpp b/ydb/core/tx/replication/service/transfer_writer_ut.cpp index df1d01d90c70..008518195241 100644 --- a/ydb/core/tx/replication/service/transfer_writer_ut.cpp +++ b/ydb/core/tx/replication/service/transfer_writer_ut.cpp @@ -1,6 +1,6 @@ #include "service.h" #include "transfer_writer.h" -#include "worker.h" +#include "common_ut.h" #include #include @@ -18,10 +18,6 @@ namespace NKikimr::NReplication::NService { Y_UNIT_TEST_SUITE(TransferWriter) { using namespace NTestHelpers; - TEvWorker::TEvData::TRecord TRecord(ui64 offset, const TString& data) { - return TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 13 /* seqNo */); - } - Y_UNIT_TEST(Write_ColumnTable) { TEnv env; env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG);