Skip to content

Commit 1e9aba9

Browse files
authored
(refactoring) Basic TTopicMessage (#15381)
1 parent 6fe3b58 commit 1e9aba9

21 files changed

+189
-152
lines changed

ydb/core/backup/impl/local_partition_reader.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
#include "local_partition_reader.h"
22
#include "logging.h"
33

4-
#include <ydb/library/actors/core/actor.h>
5-
#include <ydb/library/services/services.pb.h>
6-
74
#include <ydb/core/persqueue/events/global.h>
85
#include <ydb/core/protos/grpc_pq_old.pb.h>
96
#include <ydb/core/tx/replication/service/worker.h>
7+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
8+
#include <ydb/library/actors/core/actor.h>
9+
#include <ydb/library/services/services.pb.h>
1010

1111
using namespace NActors;
1212
using namespace NKikimr::NReplication::NService;
@@ -131,11 +131,11 @@ class TLocalPartitionReader
131131
}
132132

133133
auto gotOffset = Offset;
134-
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(readResult.ResultSize()));
134+
TVector<NReplication::TTopicMessage> records(::Reserve(readResult.ResultSize()));
135135

136136
for (auto& result : readResult.GetResult()) {
137137
gotOffset = std::max(gotOffset, result.GetOffset());
138-
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo());
138+
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData());
139139
}
140140
SentOffset = gotOffset + 1;
141141

ydb/core/backup/impl/local_partition_reader_ut.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ Y_UNIT_TEST_SUITE(LocalPartitionReader) {
4646
auto data = runtime.GrabEdgeEventRethrow<TEvWorker::TEvData>(handle);
4747
UNIT_ASSERT_VALUES_EQUAL(data->Source, PARTITION_STR);
4848
UNIT_ASSERT_VALUES_EQUAL(data->Records.size(), 2);
49-
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Offset, INITIAL_OFFSET + dataPatternCookie * 2);
50-
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Data, Sprintf("1-%d", dataPatternCookie));
51-
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Offset, INITIAL_OFFSET + dataPatternCookie * 2 + 1);
52-
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Data, Sprintf("2-%d", dataPatternCookie));
49+
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].GetOffset(), INITIAL_OFFSET + dataPatternCookie * 2);
50+
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].GetData(), Sprintf("1-%d", dataPatternCookie));
51+
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].GetOffset(), INITIAL_OFFSET + dataPatternCookie * 2 + 1);
52+
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].GetData(), Sprintf("2-%d", dataPatternCookie));
5353
}
5454

5555
TEvPersQueue::TEvResponse* GenerateData(ui32 dataPatternCookie) {

ydb/core/tx/replication/service/base_table_writer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/core/change_exchange/util.h>
99
#include <ydb/core/tablet_flat/flat_row_eggs.h>
1010
#include <ydb/core/tx/datashard/datashard.h>
11+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
1112
#include <ydb/core/tx/scheme_cache/helpers.h>
1213
#include <ydb/core/tx/tx_proxy/proxy.h>
1314
#include <ydb/library/actors/core/actor_bootstrapped.h>
@@ -434,8 +435,8 @@ class TLocalTableWriter
434435
TSet<TRowVersion> versionsWithoutTxId;
435436

436437
for (auto& r : ev->Get()->Records) {
437-
auto offset = r.Offset;
438-
auto& data = r.Data;
438+
auto offset = r.GetOffset();
439+
auto& data = r.GetData();
439440

440441
auto record = Parser->Parse(ev->Get()->Source, offset, std::move(data));
441442

ydb/core/tx/replication/service/common_ut.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
#pragma once
22

3-
#include "worker.h"
3+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
44

55
namespace NKikimr::NReplication::NService {
66

7-
struct TRecord: public TEvWorker::TEvData::TRecord {
7+
struct TRecord: public TTopicMessage {
88
explicit TRecord(ui64 offset, const TString& data)
9-
: TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 42)
9+
: TTopicMessage(offset, data)
1010
{}
1111
};
1212

ydb/core/tx/replication/service/s3_writer_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include "common_ut.h"
12
#include "s3_writer.h"
23
#include "worker.h"
34

@@ -60,8 +61,7 @@ Y_UNIT_TEST_SUITE(S3Writer) {
6061
UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/writer.AtufpxzetsqaVnEuozdXpD.json"),
6162
R"({"finished":false,"table_name":"/MyRoot/Table","writer_name":"AtufpxzetsqaVnEuozdXpD"})");
6263

63-
using TRecord = TEvWorker::TEvData::TRecord;
64-
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({
64+
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
6565
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
6666
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
6767
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
@@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(S3Writer) {
7575
R"({"key":[2], "update":{"value":"20"}})" "\n"
7676
R"({"key":[3], "update":{"value":"30"}})" "\n");
7777

78-
auto res = env.Send<TEvWorker::TEvGone>(writer, new TEvWorker::TEvData({}));
78+
auto res = env.Send<TEvWorker::TEvGone>(writer, new TEvWorker::TEvData(0, "TestSource", {}));
7979

8080
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, TEvWorker::TEvGone::DONE);
8181
UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 2);

ydb/core/tx/replication/service/table_writer_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
#include "common_ut.h"
12
#include "service.h"
23
#include "table_writer.h"
3-
#include "common_ut.h"
4+
#include "worker.h"
45

56
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
67
#include <ydb/core/tx/replication/ut_helpers/test_env.h>

ydb/core/tx/replication/service/topic_reader.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "topic_reader.h"
33
#include "worker.h"
44

5+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
56
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
67
#include <ydb/library/actors/core/actor.h>
78
#include <ydb/library/actors/core/hfunc.h>
@@ -54,11 +55,11 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
5455
LOG_D("Handle " << ev->Get()->ToString());
5556

5657
auto& result = ev->Get()->Result;
57-
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(result.Messages.size()));
58+
TVector<TTopicMessage> records(::Reserve(result.Messages.size()));
5859

5960
for (auto& msg : result.Messages) {
6061
Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
61-
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime(), std::move(msg.GetMessageGroupId()), std::move(msg.GetProducerId()), msg.GetSeqNo());
62+
records.push_back(std::move(msg));
6263
}
6364

6465
Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records)));

ydb/core/tx/replication/service/topic_reader_ut.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {
8686
UNIT_ASSERT_VALUES_EQUAL(records.size(), 1);
8787

8888
const auto& record = records.at(0);
89-
UNIT_ASSERT_VALUES_EQUAL(record.Offset, 0);
90-
UNIT_ASSERT_VALUES_EQUAL(record.Data, "message-1");
89+
UNIT_ASSERT_VALUES_EQUAL(record.GetOffset(), 0);
90+
UNIT_ASSERT_VALUES_EQUAL(record.GetData(), "message-1");
9191
}
9292

9393
// trigger commit, write new data & kill reader
@@ -103,8 +103,8 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {
103103
UNIT_ASSERT_VALUES_EQUAL(records.size(), 1);
104104

105105
const auto& record = records.at(0);
106-
UNIT_ASSERT_VALUES_EQUAL(record.Offset, 1);
107-
UNIT_ASSERT_VALUES_EQUAL(record.Data, "message-2");
106+
UNIT_ASSERT_VALUES_EQUAL(record.GetOffset(), 1);
107+
UNIT_ASSERT_VALUES_EQUAL(record.GetData(), "message-2");
108108
}
109109
}
110110
}

ydb/core/tx/replication/service/transfer_writer.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22
#include "transfer_writer.h"
33
#include "worker.h"
44

5-
#include <ydb/library/actors/core/actor_bootstrapped.h>
6-
#include <ydb/library/actors/core/hfunc.h>
7-
#include <ydb/library/services/services.pb.h>
8-
9-
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
105
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
116
#include <ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h>
127
#include <ydb/core/kqp/runtime/kqp_write_table.h>
13-
#include <ydb/core/persqueue/purecalc/purecalc.h>
8+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
9+
#include <ydb/core/persqueue/purecalc/purecalc.h> // should be after topic_message
1410
#include <ydb/core/tx/scheme_cache/helpers.h>
1511
#include <ydb/core/tx/tx_proxy/upload_rows_common_impl.h>
12+
#include <ydb/library/actors/core/actor_bootstrapped.h>
13+
#include <ydb/library/actors/core/hfunc.h>
14+
#include <ydb/library/services/services.pb.h>
15+
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
1616

1717
#include <yql/essentials/providers/common/schema/parser/yql_type_parser.h>
1818
#include <yql/essentials/public/purecalc/helpers/stream/stream_from_vector.h>
@@ -593,7 +593,7 @@ class TTransferWriter
593593
ProcessData(ev->Get()->PartitionId, ev->Get()->Records);
594594
}
595595

596-
void ProcessData(const ui32 partitionId, const TVector<TEvWorker::TEvData::TRecord>& records) {
596+
void ProcessData(const ui32 partitionId, const TVector<TTopicMessage>& records) {
597597
if (!records) {
598598
Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::DONE));
599599
return;
@@ -603,20 +603,20 @@ class TTransferWriter
603603

604604
for (auto& message : records) {
605605
NYdb::NTopic::NPurecalc::TMessage input;
606-
input.Data = std::move(message.Data);
607-
input.MessageGroupId = std::move(message.MessageGroupId);
606+
input.Data = std::move(message.GetData());
607+
input.MessageGroupId = std::move(message.GetMessageGroupId());
608608
input.Partition = partitionId;
609-
input.ProducerId = std::move(message.ProducerId);
610-
input.Offset = message.Offset;
611-
input.SeqNo = message.SeqNo;
609+
input.ProducerId = std::move(message.GetProducerId());
610+
input.Offset = message.GetOffset();
611+
input.SeqNo = message.GetSeqNo();
612612

613613
try {
614614
auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input}));
615615
while (auto* m = result->Fetch()) {
616616
TableState->AddData(m->Data);
617617
}
618618
} catch (const yexception& e) {
619-
ProcessingError = TStringBuilder() << "Error transform message: '" << message.Data << "': " << e.what();
619+
ProcessingError = TStringBuilder() << "Error transform message: " << e.what();
620620
break;
621621
}
622622
}
@@ -730,7 +730,7 @@ class TTransferWriter
730730

731731
std::optional<TActorId> PendingWorker;
732732
ui32 PendingPartitionId = 0;
733-
std::optional<TVector<TEvWorker::TEvData::TRecord>> PendingRecords;
733+
std::optional<TVector<TTopicMessage>> PendingRecords;
734734

735735
ui32 Attempt = 0;
736736
TDuration Delay = TDuration::Minutes(1);

ydb/core/tx/replication/service/transfer_writer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
namespace NKikimr {
88
struct TPathId;
99
}
10+
1011
namespace NKikimr::NReplication::NService {
1112

1213
IActor* CreateTransferWriter(const TString& transformLambda, const TPathId& tablePathId,

ydb/core/tx/replication/service/transfer_writer_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
#include "common_ut.h"
12
#include "service.h"
23
#include "transfer_writer.h"
3-
#include "common_ut.h"
4+
#include "worker.h"
45

56
#include <ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h>
67
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>

ydb/core/tx/replication/service/ut_s3_writer/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SIZE(MEDIUM)
66

77
PEERDIR(
88
ydb/core/tx/replication/ut_helpers
9+
ydb/core/tx/replication/ydb_proxy
910
library/cpp/string_utils/base64
1011
library/cpp/testing/unittest
1112
)

ydb/core/tx/replication/service/ut_table_writer/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SIZE(MEDIUM)
77
PEERDIR(
88
ydb/core/tx/datashard/ut_common
99
ydb/core/tx/replication/ut_helpers
10+
ydb/core/tx/replication/ydb_proxy
1011
library/cpp/string_utils/base64
1112
library/cpp/testing/unittest
1213
)

ydb/core/tx/replication/service/ut_transfer_writer/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SIZE(MEDIUM)
77
PEERDIR(
88
ydb/core/tx/datashard/ut_common
99
ydb/core/tx/replication/ut_helpers
10+
ydb/core/tx/replication/ydb_proxy
1011
library/cpp/string_utils/base64
1112
library/cpp/testing/unittest
1213
)

ydb/core/tx/replication/service/worker.cpp

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "worker.h"
44

55
#include <ydb/core/base/appdata.h>
6+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
67
#include <ydb/library/actors/core/actor_bootstrapped.h>
78
#include <ydb/library/actors/core/hfunc.h>
89
#include <ydb/library/services/services.pb.h>
@@ -13,48 +14,20 @@
1314

1415
namespace NKikimr::NReplication::NService {
1516

16-
TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo)
17-
: Offset(offset)
18-
, Data(data)
19-
, CreateTime(createTime)
20-
, MessageGroupId(messageGroupId)
21-
, ProducerId(producerId)
22-
, SeqNo(seqNo)
23-
{
24-
}
25-
26-
TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo)
27-
: Offset(offset)
28-
, Data(std::move(data))
29-
, CreateTime(createTime)
30-
, MessageGroupId(std::move(messageGroupId))
31-
, ProducerId(std::move(producerId))
32-
, SeqNo(seqNo)
33-
{
34-
}
35-
36-
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records)
17+
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TTopicMessage>& records)
3718
: PartitionId(partitionId)
3819
, Source(source)
3920
, Records(records)
4021
{
4122
}
4223

43-
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, TVector<TRecord>&& records)
24+
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, TVector<TTopicMessage>&& records)
4425
: PartitionId(partitionId)
4526
, Source(source)
4627
, Records(std::move(records))
4728
{
4829
}
4930

50-
void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const {
51-
out << "{"
52-
<< " Offset: " << Offset
53-
<< " Data: " << Data.size() << "b"
54-
<< " CreateTime: " << CreateTime.ToStringUpToSeconds()
55-
<< " }";
56-
}
57-
5831
TString TEvWorker::TEvData::ToString() const {
5932
return TStringBuilder() << ToStringHeader() << " {"
6033
<< " Source: " << Source
@@ -189,11 +162,11 @@ class TWorker: public TActorBootstrapped<TWorker> {
189162
if (InFlightData) {
190163
const auto& records = InFlightData->Records;
191164
auto it = MinElementBy(records, [](const auto& record) {
192-
return record.CreateTime;
165+
return record.GetCreateTime();
193166
});
194167

195168
if (it != records.end()) {
196-
Lag = TlsActivationContext->Now() - it->CreateTime;
169+
Lag = TlsActivationContext->Now() - it->GetCreateTime();
197170
}
198171
}
199172

ydb/core/tx/replication/service/worker.h

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88

99
#include <functional>
1010

11-
namespace NKikimr::NReplication::NService {
11+
namespace NKikimr::NReplication {
12+
13+
class TTopicMessage;
14+
15+
namespace NService {
1216

1317
struct TEvWorker {
1418
enum EEv {
@@ -30,25 +34,12 @@ struct TEvWorker {
3034
struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {};
3135

3236
struct TEvData: public TEventLocal<TEvData, EvData> {
33-
struct TRecord {
34-
ui64 Offset;
35-
TString Data;
36-
TInstant CreateTime;
37-
TString MessageGroupId;
38-
TString ProducerId;
39-
ui64 SeqNo;
40-
41-
explicit TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo);
42-
explicit TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo);
43-
void Out(IOutputStream& out) const;
44-
};
45-
4637
ui32 PartitionId;
4738
TString Source;
48-
TVector<TRecord> Records;
39+
TVector<TTopicMessage> Records;
4940

50-
explicit TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records);
51-
explicit TEvData(ui32 partitionId, const TString& source, TVector<TRecord>&& records);
41+
explicit TEvData(ui32 partitionId, const TString& source, const TVector<TTopicMessage>& records);
42+
explicit TEvData(ui32 partitionId, const TString& source, TVector<TTopicMessage>&& records);
5243
TString ToString() const override;
5344
};
5445

@@ -89,8 +80,5 @@ IActor* CreateWorker(
8980
std::function<IActor*(void)>&& createReaderFn,
9081
std::function<IActor*(void)>&& createWriterFn);
9182

92-
}
93-
94-
Y_DECLARE_OUT_SPEC(inline, NKikimr::NReplication::NService::TEvWorker::TEvData::TRecord, o, x) {
95-
return x.Out(o);
96-
}
83+
} // NService
84+
} // NKikimr::NReplication

0 commit comments

Comments
 (0)