Skip to content

Commit ce1e546

Browse files
nshestakovblinkov
authored andcommitted
Expand the list of message fields available for transfering from a topic to a table (#14941)
1 parent dc365ca commit ce1e546

File tree

13 files changed

+382
-90
lines changed

13 files changed

+382
-90
lines changed

ydb/core/backup/impl/local_partition_reader.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,11 @@ class TLocalPartitionReader
135135

136136
for (auto& result : readResult.GetResult()) {
137137
gotOffset = std::max(gotOffset, result.GetOffset());
138-
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData());
138+
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo());
139139
}
140140
SentOffset = gotOffset + 1;
141141

142-
Send(Worker, new TEvWorker::TEvData(ToString(Partition), std::move(records)));
142+
Send(Worker, new TEvWorker::TEvData(Partition, ToString(Partition), std::move(records)));
143143
}
144144

145145
void Leave(TEvWorker::TEvGone::EStatus status) {

ydb/core/persqueue/purecalc/purecalc.cpp

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,13 @@ using namespace NYql::NUdf;
1313
using namespace NKikimr::NMiniKQL;
1414

1515
constexpr const char* DataFieldName = "_data";
16+
constexpr const char* MessageGroupIdFieldName = "_message_group_id";
1617
constexpr const char* OffsetFieldName = "_offset";
18+
constexpr const char* PartitionFieldName = "_partition";
19+
constexpr const char* ProducerIdFieldName = "_producer_id";
20+
constexpr const char* SeqNoFieldName = "_seq_no";
1721

18-
constexpr const size_t FieldCount = 2; // Change it when change fields
19-
20-
struct FieldPositions {
21-
ui64 Data = 0;
22-
ui64 Offset = 0;
23-
};
22+
constexpr const size_t FieldCount = 6; // Change it when change fields
2423

2524

2625
NYT::TNode CreateTypeNode(const TString& fieldType) {
@@ -41,7 +40,11 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy
4140
NYT::TNode CreateMessageScheme() {
4241
auto structMembers = NYT::TNode::CreateList();
4342
AddField(structMembers, DataFieldName, "String");
43+
AddField(structMembers, MessageGroupIdFieldName, "String");
4444
AddField(structMembers, OffsetFieldName, "Uint64");
45+
AddField(structMembers, PartitionFieldName, "Uint32");
46+
AddField(structMembers, ProducerIdFieldName, "String");
47+
AddField(structMembers, SeqNoFieldName, "Uint64");
4548

4649
return NYT::TNode::CreateList()
4750
.Add("StructType")
@@ -57,32 +60,36 @@ struct TMessageWrapper {
5760
return NKikimr::NMiniKQL::MakeString(Message.Data);
5861
}
5962

63+
NYql::NUdf::TUnboxedValuePod GetMessageGroupId() const {
64+
return NKikimr::NMiniKQL::MakeString(Message.MessageGroupId);
65+
}
66+
6067
NYql::NUdf::TUnboxedValuePod GetOffset() const {
6168
return NYql::NUdf::TUnboxedValuePod(Message.Offset);
6269
}
70+
71+
NYql::NUdf::TUnboxedValuePod GetPartition() const {
72+
return NYql::NUdf::TUnboxedValuePod(Message.Partition);
73+
}
74+
75+
NYql::NUdf::TUnboxedValuePod GetProducerId() const {
76+
return NKikimr::NMiniKQL::MakeString(Message.ProducerId);
77+
}
78+
79+
NYql::NUdf::TUnboxedValuePod GetSeqNo() const {
80+
return NYql::NUdf::TUnboxedValuePod(Message.SeqNo);
81+
}
6382
};
6483

6584
class TInputConverter {
6685
protected:
6786
IWorker* Worker_;
6887
TPlainContainerCache Cache_;
69-
FieldPositions Position;
7088

7189
public:
7290
explicit TInputConverter(IWorker* worker)
7391
: Worker_(worker)
7492
{
75-
const TStructType* structType = worker->GetInputType();
76-
const ui64 count = structType->GetMembersCount();
77-
78-
for (ui64 i = 0; i < count; ++i) {
79-
const auto name = structType->GetMemberName(i);
80-
if (name == DataFieldName) {
81-
Position.Data = i;
82-
} else if (name == OffsetFieldName) {
83-
Position.Offset = i;
84-
}
85-
}
8693
}
8794

8895
public:
@@ -92,8 +99,13 @@ class TInputConverter {
9299
result = Cache_.NewArray(holderFactory, static_cast<ui32>(FieldCount), items);
93100

94101
TMessageWrapper wrap {*message};
95-
items[Position.Data] = wrap.GetData();
96-
items[Position.Offset] = wrap.GetOffset();
102+
// lex order by field name
103+
items[0] = wrap.GetData();
104+
items[1] = wrap.GetMessageGroupId();
105+
items[2] = wrap.GetOffset();
106+
items[3] = wrap.GetPartition();
107+
items[4] = wrap.GetProducerId();
108+
items[5] = wrap.GetSeqNo();
97109
}
98110

99111
void ClearCache() {

ydb/core/persqueue/purecalc/purecalc.h

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,12 @@ namespace NYdb::NTopic::NPurecalc {
77
using namespace NYql::NPureCalc;
88

99
struct TMessage {
10-
TMessage(const TString& data)
11-
: Data(data) {
12-
}
13-
14-
TMessage& WithOffset(ui64 offset) {
15-
Offset = offset;
16-
return *this;
17-
}
18-
19-
const TString& Data;
10+
TString Data;
11+
TString MessageGroupId;
2012
ui64 Offset = 0;
13+
ui32 Partition = 0;
14+
TString ProducerId;
15+
ui64 SeqNo = 0;
2116
};
2217

2318
class TMessageInputSpec: public TInputSpecBase {

ydb/core/tx/replication/service/base_table_writer.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,10 @@ class TLocalTableWriter
433433
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records(::Reserve(ev->Get()->Records.size()));
434434
TSet<TRowVersion> versionsWithoutTxId;
435435

436-
for (auto& [offset, data, _] : ev->Get()->Records) {
436+
for (auto& r : ev->Get()->Records) {
437+
auto offset = r.Offset;
438+
auto& data = r.Data;
439+
437440
auto record = Parser->Parse(ev->Get()->Source, offset, std::move(data));
438441

439442
if (Mode == EWriteMode::Consistent) {
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#pragma once
2+
3+
#include "worker.h"
4+
5+
namespace NKikimr::NReplication::NService {
6+
7+
struct TRecord: public TEvWorker::TEvData::TRecord {
8+
explicit TRecord(ui64 offset, const TString& data)
9+
: TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 42)
10+
{}
11+
};
12+
13+
}

ydb/core/tx/replication/service/table_writer_ut.cpp

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#include "service.h"
22
#include "table_writer.h"
3-
#include "worker.h"
3+
#include "common_ut.h"
44

55
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
66
#include <ydb/core/tx/replication/ut_helpers/test_env.h>
@@ -16,7 +16,6 @@ namespace NKikimr::NReplication::NService {
1616

1717
Y_UNIT_TEST_SUITE(LocalTableWriter) {
1818
using namespace NTestHelpers;
19-
using TRecord = TEvWorker::TEvData::TRecord;
2019

2120
Y_UNIT_TEST(WriteTable) {
2221
TEnv env;
@@ -35,7 +34,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
3534
auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table")));
3635
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
3736

38-
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
37+
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
3938
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
4039
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
4140
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
@@ -92,7 +91,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
9291
auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table")));
9392
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
9493

95-
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
94+
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
9695
TRecord(1, R"({"key":[1], "update":{"int32_value":-100500}})"),
9796
TRecord(2, R"({"key":[2], "update":{"uint32_value":100500}})"),
9897
TRecord(3, R"({"key":[3], "update":{"int64_value":-200500}})"),
@@ -143,7 +142,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
143142
auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table")));
144143
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
145144

146-
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
145+
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
147146
TRecord(1, R"({"key":["1.0"], "update":{"value":"155555555555555.321"}})"),
148147
TRecord(2, R"({"key":["2.0"], "update":{"value":"255555555555555.321"}})"),
149148
TRecord(3, R"({"key":["3.0"], "update":{"value":"355555555555555.321"}})"),
@@ -184,7 +183,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
184183
ui64 order = 1;
185184

186185
{
187-
auto ev = env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
186+
auto ev = env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0,"TestSource", {
188187
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
189188
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"),
190189
TRecord(order++, R"({"key":[3], "update":{"value":"30"}, "ts":[3,0]})"),
@@ -203,14 +202,14 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
203202
}));
204203
}
205204
{
206-
auto ev = env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData("TestSource", {
205+
auto ev = env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData(0, "TestSource", {
207206
TRecord(order++, R"({"resolved":[10,0]})"),
208207
}));
209208
UNIT_ASSERT_VALUES_EQUAL(TRowVersion::FromProto(ev->Get()->Record.GetVersion()), TRowVersion(10, 0));
210209
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
211210
}
212211

213-
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
212+
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
214213
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[11,0]})"),
215214
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[12,0]})"),
216215
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[21,0]})"),
@@ -222,12 +221,12 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
222221
{TRowVersion(30, 0), 3},
223222
}));
224223

225-
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData("TestSource", {
224+
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
226225
TRecord(order++, R"({"key":[1], "update":{"value":"10"}, "ts":[13,0]})"),
227226
TRecord(order++, R"({"key":[2], "update":{"value":"20"}, "ts":[23,0]})"),
228227
}));
229228

230-
env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData("TestSource", {
229+
env.Send<TEvService::TEvHeartbeat>(writer, new TEvWorker::TEvData(0, "TestSource", {
231230
TRecord(order++, R"({"resolved":[30,0]})"),
232231
}));
233232
env.GetRuntime().GrabEdgeEvent<TEvWorker::TEvPoll>(env.GetSender());
@@ -294,7 +293,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
294293
auto worker = env.GetRuntime().Register(new TMockWorker(writer, env.GetSender()));
295294

296295
env.Send<TEvWorker::TEvHandshake>(worker, new TEvWorker::TEvHandshake());
297-
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
296+
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
298297
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
299298
TRecord(2, R"({"key":[2], "update":{"value":"20"}, "ts":[11,0]})"),
300299
}));
@@ -377,7 +376,7 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
377376
auto worker = env.GetRuntime().Register(new TMockWorker(writer, env.GetSender()));
378377

379378
env.Send<TEvWorker::TEvHandshake>(worker, new TEvWorker::TEvHandshake());
380-
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
379+
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
381380
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
382381
TRecord(2, R"({"resolved":[10,0]})"),
383382
}));
@@ -407,14 +406,14 @@ Y_UNIT_TEST_SUITE(LocalTableWriter) {
407406
auto writer = env.GetRuntime().Register(CreateLocalTableWriter(env.GetPathId("/Root/Table"), EWriteMode::Consistent));
408407
env.Send<TEvWorker::TEvHandshake>(writer, new TEvWorker::TEvHandshake());
409408

410-
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
409+
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
411410
TRecord(1, R"({"key":[1], "update":{"value":"10"}, "ts":[1,0]})"),
412411
}));
413412
env.Send<TEvWorker::TEvPoll>(writer, MakeTxIdResult({
414413
{TRowVersion(10, 0), 1},
415414
}));
416415

417-
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData("TestSource", {
416+
env.Send<TEvService::TEvGetTxId>(writer, new TEvWorker::TEvData(0, "TestSource", {
418417
TRecord(2, R"({"key":[3], "update":{"value":"30"}, "ts":[11,0]})"),
419418
TRecord(3, R"({"key":[2], "update":{"value":"20"}, "ts":[2,0]})"),
420419
TRecord(4, R"({"resolved":[20,0]})"),

ydb/core/tx/replication/service/topic_reader.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
5858

5959
for (auto& msg : result.Messages) {
6060
Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
61-
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime());
61+
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime(), std::move(msg.GetMessageGroupId()), std::move(msg.GetProducerId()), msg.GetSeqNo());
6262
}
6363

64-
Send(Worker, new TEvWorker::TEvData(ToString(result.PartitionId), std::move(records)));
64+
Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records)));
6565
}
6666

6767
void Handle(TEvYdbProxy::TEvTopicEndPartition::TPtr& ev) {

ydb/core/tx/replication/service/transfer_writer.cpp

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ class TTransferWriter
551551
}
552552

553553
if (PendingRecords) {
554-
ProcessData(*PendingRecords);
554+
ProcessData(PendingPartitionId, *PendingRecords);
555555
PendingRecords.reset();
556556
}
557557
}
@@ -584,15 +584,16 @@ class TTransferWriter
584584

585585
void HoldHandle(TEvWorker::TEvData::TPtr& ev) {
586586
Y_ABORT_UNLESS(!PendingRecords);
587+
PendingPartitionId = ev->Get()->PartitionId;
587588
PendingRecords = std::move(ev->Get()->Records);
588589
}
589590

590591
void Handle(TEvWorker::TEvData::TPtr& ev) {
591592
LOG_D("Handle TEvData " << ev->Get()->ToString());
592-
ProcessData(ev->Get()->Records);
593+
ProcessData(ev->Get()->PartitionId, ev->Get()->Records);
593594
}
594595

595-
void ProcessData(const TVector<TEvWorker::TEvData::TRecord>& records) {
596+
void ProcessData(const ui32 partitionId, const TVector<TEvWorker::TEvData::TRecord>& records) {
596597
if (!records) {
597598
Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::DONE));
598599
return;
@@ -601,17 +602,29 @@ class TTransferWriter
601602
TableState->EnshureDataBatch();
602603

603604
for (auto& message : records) {
604-
NYdb::NTopic::NPurecalc::TMessage input(message.Data);
605-
input.WithOffset(message.Offset);
606-
607-
auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input}));
608-
while (auto* m = result->Fetch()) {
609-
TableState->AddData(m->Data);
605+
NYdb::NTopic::NPurecalc::TMessage input;
606+
input.Data = std::move(message.Data);
607+
input.MessageGroupId = std::move(message.MessageGroupId);
608+
input.Partition = partitionId;
609+
input.ProducerId = std::move(message.ProducerId);
610+
input.Offset = message.Offset;
611+
input.SeqNo = message.SeqNo;
612+
613+
try {
614+
auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input}));
615+
while (auto* m = result->Fetch()) {
616+
TableState->AddData(m->Data);
617+
}
618+
} catch (const yexception& e) {
619+
ProcessingError = TStringBuilder() << "Error transform message: '" << message.Data << "': " << e.what();
620+
break;
610621
}
611622
}
612623

613624
if (TableState->Flush()) {
614625
Become(&TThis::StateWrite);
626+
} else if (ProcessingError) {
627+
LogCritAndLeave(*ProcessingError);
615628
}
616629
}
617630

@@ -636,6 +649,10 @@ class TTransferWriter
636649
return LogCritAndLeave(error);
637650
}
638651

652+
if (ProcessingError) {
653+
return LogCritAndLeave(*ProcessingError);
654+
}
655+
639656
Send(Worker, new TEvWorker::TEvPoll());
640657
return StartWork();
641658
}
@@ -709,8 +726,10 @@ class TTransferWriter
709726
TProgramHolder::TPtr ProgramHolder;
710727

711728
mutable TMaybe<TString> LogPrefix;
729+
mutable TMaybe<TString> ProcessingError;
712730

713731
std::optional<TActorId> PendingWorker;
732+
ui32 PendingPartitionId = 0;
714733
std::optional<TVector<TEvWorker::TEvData::TRecord>> PendingRecords;
715734

716735
ui32 Attempt = 0;

0 commit comments

Comments
 (0)