Skip to content

Commit 4387e71

Browse files
committed
SeqNo, MessaeGroupId and ProducerId
1 parent ab84089 commit 4387e71

File tree

10 files changed

+202
-32
lines changed

10 files changed

+202
-32
lines changed

ydb/core/backup/impl/local_partition_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class TLocalPartitionReader
135135

136136
for (auto& result : readResult.GetResult()) {
137137
gotOffset = std::max(gotOffset, result.GetOffset());
138-
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData());
138+
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo());
139139
}
140140
SentOffset = gotOffset + 1;
141141

ydb/core/persqueue/purecalc/purecalc.cpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@ using namespace NYql::NUdf;
1313
using namespace NKikimr::NMiniKQL;
1414

1515
constexpr const char* DataFieldName = "_data";
16+
constexpr const char* MessageGroupIdFieldName = "_message_group_id";
1617
constexpr const char* OffsetFieldName = "_offset";
1718
constexpr const char* PartitionFieldName = "_partition";
19+
constexpr const char* ProducerIdFieldName = "_producer_id";
20+
constexpr const char* SeqNoFieldName = "_seq_no";
1821

19-
constexpr const size_t FieldCount = 3; // Change it when change fields
22+
constexpr const size_t FieldCount = 6; // Change it when change fields
2023

2124

2225
NYT::TNode CreateTypeNode(const TString& fieldType) {
@@ -37,8 +40,11 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy
3740
NYT::TNode CreateMessageScheme() {
3841
auto structMembers = NYT::TNode::CreateList();
3942
AddField(structMembers, DataFieldName, "String");
43+
AddField(structMembers, MessageGroupIdFieldName, "String");
4044
AddField(structMembers, OffsetFieldName, "Uint64");
4145
AddField(structMembers, PartitionFieldName, "Uint32");
46+
AddField(structMembers, ProducerIdFieldName, "String");
47+
AddField(structMembers, SeqNoFieldName, "Uint64");
4248

4349
return NYT::TNode::CreateList()
4450
.Add("StructType")
@@ -54,13 +60,25 @@ struct TMessageWrapper {
5460
return NKikimr::NMiniKQL::MakeString(Message.Data);
5561
}
5662

63+
NYql::NUdf::TUnboxedValuePod GetMessageGroupId() const {
64+
return NKikimr::NMiniKQL::MakeString(Message.MessageGroupId);
65+
}
66+
5767
NYql::NUdf::TUnboxedValuePod GetOffset() const {
5868
return NYql::NUdf::TUnboxedValuePod(Message.Offset);
5969
}
6070

6171
NYql::NUdf::TUnboxedValuePod GetPartition() const {
6272
return NYql::NUdf::TUnboxedValuePod(Message.Partition);
6373
}
74+
75+
NYql::NUdf::TUnboxedValuePod GetProducerId() const {
76+
return NKikimr::NMiniKQL::MakeString(Message.ProducerId);
77+
}
78+
79+
NYql::NUdf::TUnboxedValuePod GetSeqNo() const {
80+
return NYql::NUdf::TUnboxedValuePod(Message.SeqNo);
81+
}
6482
};
6583

6684
class TInputConverter {
@@ -83,8 +101,11 @@ class TInputConverter {
83101
TMessageWrapper wrap {*message};
84102
// lex order by field name
85103
items[0] = wrap.GetData();
86-
items[1] = wrap.GetOffset();
87-
items[2] = wrap.GetPartition();
104+
items[1] = wrap.GetMessageGroupId();
105+
items[2] = wrap.GetOffset();
106+
items[3] = wrap.GetPartition();
107+
items[4] = wrap.GetProducerId();
108+
items[5] = wrap.GetSeqNo();
88109
}
89110

90111
void ClearCache() {

ydb/core/persqueue/purecalc/purecalc.h

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,12 @@ namespace NYdb::NTopic::NPurecalc {
77
using namespace NYql::NPureCalc;
88

99
struct TMessage {
10-
TMessage(const TString& data)
11-
: Data(data) {
12-
}
13-
14-
TMessage& WithPartition(ui64 partition) {
15-
Partition = partition;
16-
return *this;
17-
}
18-
19-
TMessage& WithOffset(ui64 offset) {
20-
Offset = offset;
21-
return *this;
22-
}
23-
24-
const TString& Data;
25-
ui32 Partition = 0;
10+
TString Data;
11+
TString MessageGroupId;
2612
ui64 Offset = 0;
13+
ui32 Partition = 0;
14+
TString ProducerId;
15+
ui64 SeqNo = 0;
2716
};
2817

2918
class TMessageInputSpec: public TInputSpecBase {

ydb/core/tx/replication/service/base_table_writer.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,10 @@ class TLocalTableWriter
433433
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records(::Reserve(ev->Get()->Records.size()));
434434
TSet<TRowVersion> versionsWithoutTxId;
435435

436-
for (auto& [offset, data, _] : ev->Get()->Records) {
436+
for (auto& r : ev->Get()->Records) {
437+
auto offset = r.Offset;
438+
auto& data = r.Data;
439+
437440
auto record = Parser->Parse(ev->Get()->Source, offset, std::move(data));
438441

439442
if (Mode == EWriteMode::Consistent) {

ydb/core/tx/replication/service/topic_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
5858

5959
for (auto& msg : result.Messages) {
6060
Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
61-
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime());
61+
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime(), std::move(msg.GetMessageGroupId()), std::move(msg.GetProducerId()), msg.GetSeqNo());
6262
}
6363

6464
Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records)));

ydb/core/tx/replication/service/transfer_writer.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -602,9 +602,13 @@ class TTransferWriter
602602
TableState->EnshureDataBatch();
603603

604604
for (auto& message : records) {
605-
NYdb::NTopic::NPurecalc::TMessage input(message.Data);
606-
input.WithPartition(partitionId);
607-
input.WithOffset(message.Offset);
605+
NYdb::NTopic::NPurecalc::TMessage input;
606+
input.Data = std::move(message.Data);
607+
input.MessageGroupId = std::move(message.MessageGroupId);
608+
input.Partition = partitionId;
609+
input.ProducerId = std::move(message.ProducerId);
610+
input.Offset = message.Offset;
611+
input.SeqNo = message.SeqNo;
608612

609613
try {
610614
auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input}));

ydb/core/tx/replication/service/worker.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,23 @@
1313

1414
namespace NKikimr::NReplication::NService {
1515

16-
TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime)
16+
TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo)
1717
: Offset(offset)
1818
, Data(data)
1919
, CreateTime(createTime)
20+
, MessageGroupId(messageGroupId)
21+
, ProducerId(producerId)
22+
, SeqNo(seqNo)
2023
{
2124
}
2225

23-
TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime)
26+
TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo)
2427
: Offset(offset)
2528
, Data(std::move(data))
2629
, CreateTime(createTime)
30+
, MessageGroupId(std::move(messageGroupId))
31+
, ProducerId(std::move(producerId))
32+
, SeqNo(seqNo)
2733
{
2834
}
2935

ydb/core/tx/replication/service/worker.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,12 @@ struct TEvWorker {
3434
ui64 Offset;
3535
TString Data;
3636
TInstant CreateTime;
37+
TString MessageGroupId;
38+
TString ProducerId;
39+
ui64 SeqNo;
3740

38-
explicit TRecord(ui64 offset, const TString& data, TInstant createTime = TInstant::Zero());
39-
explicit TRecord(ui64 offset, TString&& data, TInstant createTime = TInstant::Zero());
41+
explicit TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo);
42+
explicit TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo);
4043
void Out(IOutputStream& out) const;
4144
};
4245

ydb/core/tx/replication/ydb_proxy/ydb_proxy.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ struct TEvYdbProxy {
167167
, Data(msg.GetData())
168168
, CreateTime(msg.GetCreateTime())
169169
, Codec(codec)
170+
, MessageGroupId(msg.GetMessageGroupId())
171+
, ProducerId(msg.GetProducerId())
172+
, SeqNo(msg.GetSeqNo())
170173
{
171174
}
172175

@@ -186,13 +189,19 @@ struct TEvYdbProxy {
186189
TString& GetData() { return Data; }
187190
TInstant GetCreateTime() const { return CreateTime; }
188191
ECodec GetCodec() const { return Codec; }
192+
TString& GetMessageGroupId() { return MessageGroupId; }
193+
TString& GetProducerId() { return ProducerId; }
194+
ui64 GetSeqNo() { return SeqNo; }
189195
void Out(IOutputStream& out) const;
190196

191197
private:
192198
ui64 Offset;
193199
TString Data;
194200
TInstant CreateTime;
195201
ECodec Codec;
202+
TString MessageGroupId;
203+
TString ProducerId;
204+
ui64 SeqNo;
196205
};
197206

198207
explicit TReadTopicResult(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) {

ydb/tests/functional/transfer/main.cpp

Lines changed: 138 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,43 @@ std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) {
7676
}
7777

7878
struct TMessage {
79-
const char* Message;
79+
TString Message;
8080
std::optional<ui32> Partition = std::nullopt;
81+
std::optional<TString> ProducerId = std::nullopt;
82+
std::optional<TString> MessageGroupId = std::nullopt;
83+
std::optional<ui64> SeqNo = std::nullopt;
8184
};
8285

86+
TMessage _withSeqNo(ui64 seqNo) {
87+
return {
88+
.Message = TStringBuilder() << "Message-" << seqNo,
89+
.Partition = 0,
90+
.ProducerId = std::nullopt,
91+
.MessageGroupId = std::nullopt,
92+
.SeqNo = seqNo
93+
};
94+
}
95+
96+
TMessage _withProducerId(const TString& producerId) {
97+
return {
98+
.Message = TStringBuilder() << "Message-" << producerId,
99+
.Partition = 0,
100+
.ProducerId = producerId,
101+
.MessageGroupId = std::nullopt,
102+
.SeqNo = std::nullopt
103+
};
104+
}
105+
106+
TMessage _withMessageGroupId(const TString& messageGroupId) {
107+
return {
108+
.Message = TStringBuilder() << "Message-" << messageGroupId,
109+
.Partition = 0,
110+
.ProducerId = messageGroupId,
111+
.MessageGroupId = messageGroupId,
112+
.SeqNo = std::nullopt
113+
};
114+
}
115+
83116
struct TConfig {
84117
const char* TableDDL;
85118
const char* Lambda;
@@ -142,13 +175,19 @@ struct MainTestCase {
142175
for (const auto& m : Config.Messages) {
143176
TWriteSessionSettings writeSettings;
144177
writeSettings.Path(TopicName);
145-
writeSettings.DeduplicationEnabled(false);
178+
writeSettings.DeduplicationEnabled(m.SeqNo);
146179
if (m.Partition) {
147180
writeSettings.PartitionId(m.Partition);
148181
}
182+
if (m.ProducerId) {
183+
writeSettings.ProducerId(*m.ProducerId);
184+
}
185+
if (m.MessageGroupId) {
186+
writeSettings.MessageGroupId(*m.MessageGroupId);
187+
}
149188
auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings);
150189

151-
UNIT_ASSERT(writeSession->Write(m.Message));
190+
UNIT_ASSERT(writeSession->Write(m.Message, m.SeqNo));
152191
writeSession->Close(TDuration::Seconds(1));
153192
}
154193
}
@@ -530,5 +569,101 @@ Y_UNIT_TEST_SUITE(Transfer)
530569
}).Run();
531570
}
532571

572+
Y_UNIT_TEST(Main_MessageField_SeqNo)
573+
{
574+
MainTestCase({
575+
.TableDDL = R"(
576+
CREATE TABLE `%s` (
577+
SeqNo Uint64 NOT NULL,
578+
Message Utf8,
579+
PRIMARY KEY (SeqNo)
580+
) WITH (
581+
STORE = COLUMN
582+
);
583+
)",
584+
585+
.Lambda = R"(
586+
$l = ($x) -> {
587+
return [
588+
<|
589+
SeqNo:CAST($x._seq_no AS Uint32),
590+
Message:CAST($x._data AS Utf8)
591+
|>
592+
];
593+
};
594+
)",
595+
596+
.Messages = {_withSeqNo(13)},
597+
598+
.Expectations = {
599+
_C("SeqNo", ui64(13)),
600+
}
601+
}).Run();
602+
}
603+
604+
Y_UNIT_TEST(Main_MessageField_ProducerId)
605+
{
606+
MainTestCase({
607+
.TableDDL = R"(
608+
CREATE TABLE `%s` (
609+
Offset Uint64 NOT NULL,
610+
ProducerId Utf8,
611+
PRIMARY KEY (Offset)
612+
) WITH (
613+
STORE = COLUMN
614+
);
615+
)",
616+
617+
.Lambda = R"(
618+
$l = ($x) -> {
619+
return [
620+
<|
621+
Offset:CAST($x._offset AS Uint64),
622+
ProducerId:CAST($x._producer_id AS Utf8)
623+
|>
624+
];
625+
};
626+
)",
627+
628+
.Messages = {_withProducerId("Producer-13")},
629+
630+
.Expectations = {
631+
_C("ProducerId", TString("Producer-13")),
632+
}
633+
}).Run();
634+
}
635+
636+
Y_UNIT_TEST(Main_MessageField_MessageGroupId)
637+
{
638+
MainTestCase({
639+
.TableDDL = R"(
640+
CREATE TABLE `%s` (
641+
Offset Uint64 NOT NULL,
642+
MessageGroupId Utf8,
643+
PRIMARY KEY (Offset)
644+
) WITH (
645+
STORE = COLUMN
646+
);
647+
)",
648+
649+
.Lambda = R"(
650+
$l = ($x) -> {
651+
return [
652+
<|
653+
Offset:CAST($x._offset AS Uint64),
654+
MessageGroupId:CAST($x._message_group_id AS Utf8)
655+
|>
656+
];
657+
};
658+
)",
659+
660+
.Messages = {_withMessageGroupId("MessageGroupId-13")},
661+
662+
.Expectations = {
663+
_C("MessageGroupId", TString("MessageGroupId-13")),
664+
}
665+
}).Run();
666+
}
667+
533668
}
534669

0 commit comments

Comments
 (0)