Skip to content

Commit 60568aa

Browse files
committed
Split TPayloadHelper to TPayloadReader & TPayloadWriter
1 parent 9bf348b commit 60568aa

12 files changed

+37
-21
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
334334

335335
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(
336336
inFlightBatch.TxId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
337-
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite)
337+
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite)
338338
.AddDataToPayload(TString(inFlightBatch.Data));
339339

340340
evWrite->AddOperation(

ydb/core/tx/columnshard/columnshard__write.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
243243
}
244244

245245
auto arrowData = std::make_shared<TArrowData>(schema);
246-
if (!arrowData->Parse(operation, NEvWrite::TPayloadHelper<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
246+
if (!arrowData->Parse(operation, NEvWrite::TPayloadWriter<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
247247
IncCounter(COUNTER_WRITE_FAIL);
248248
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error");
249249
ctx.Send(source, result.release());

ydb/core/tx/columnshard/operations/write_data.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
namespace NKikimr::NColumnShard {
77

8-
bool TArrowData::Parse(const NKikimrDataEvents::TEvWrite_TOperation& proto, const NEvWrite::IPayloadData& payload) {
8+
bool TArrowData::Parse(const NKikimrDataEvents::TEvWrite_TOperation& proto, const NEvWrite::IPayloadReader& payload) {
99
if(proto.GetPayloadFormat() != NKikimrDataEvents::FORMAT_ARROW)
1010
{
1111
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_payload_format")("payload_format", (ui64)proto.GetPayloadFormat());

ydb/core/tx/columnshard/operations/write_data.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class TArrowData : public NEvWrite::IDataContainer {
1818
: IndexSchema(schema)
1919
{}
2020

21-
bool Parse(const NKikimrDataEvents::TEvWrite::TOperation& proto, const NKikimr::NEvWrite::IPayloadData& payload);
21+
bool Parse(const NKikimrDataEvents::TEvWrite::TOperation& proto, const NKikimr::NEvWrite::IPayloadReader& payload);
2222
virtual std::shared_ptr<arrow::RecordBatch> ExtractBatch() override;
2323
ui64 GetSchemaVersion() const override;
2424
ui64 GetSize() const override {

ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -1584,7 +1584,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
15841584
UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
15851585

15861586
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
1587-
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
1587+
ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
15881588
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
15891589

15901590
TActorId sender = runtime.AllocateEdgeActor();
@@ -1634,7 +1634,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
16341634
UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
16351635

16361636
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
1637-
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
1637+
ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
16381638
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
16391639

16401640
TActorId sender = runtime.AllocateEdgeActor();
@@ -1681,7 +1681,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
16811681
UNIT_ASSERT(blobData.size() > TLimits::GetMaxBlobSize());
16821682

16831683
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
1684-
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
1684+
ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
16851685
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
16861686

16871687
TActorId sender = runtime.AllocateEdgeActor();

ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ Y_UNIT_TEST_SUITE(Normalizers) {
246246
TString blobData = NArrow::SerializeBatchNoCompression(batch);
247247

248248
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
249-
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
249+
ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
250250
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
251251

252252
TActorId sender = runtime.AllocateEdgeActor();

ydb/core/tx/data_events/payload_helper.h

+22-6
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,26 @@
44

55
namespace NKikimr::NEvWrite {
66

7-
class IPayloadData {
7+
class IPayloadReader {
88
public:
99
virtual TString GetDataFromPayload(const ui64 index) const = 0;
10+
virtual ~IPayloadReader() {
11+
}
12+
};
13+
14+
class IPayloadWriter {
15+
public:
1016
virtual ui64 AddDataToPayload(TString&& blobData) = 0;
11-
virtual ~IPayloadData() {
17+
virtual ~IPayloadWriter() {
1218
}
1319
};
1420

1521
template <class TEvent>
16-
class TPayloadHelper: public IPayloadData {
17-
TEvent& Event;
22+
class TPayloadReader: public IPayloadReader {
23+
const TEvent& Event;
1824

1925
public:
20-
TPayloadHelper(TEvent& ev)
26+
TPayloadReader(const TEvent& ev)
2127
: Event(ev)
2228
{
2329
}
@@ -28,12 +34,22 @@ class TPayloadHelper: public IPayloadData {
2834
rope.Begin().ExtractPlainDataAndAdvance(data.Detach(), data.size());
2935
return data;
3036
}
37+
};
38+
39+
template <class TEvent>
40+
class TPayloadWriter: public IPayloadWriter {
41+
TEvent& Event;
42+
43+
public:
44+
TPayloadWriter(TEvent& ev)
45+
: Event(ev)
46+
{
47+
}
3148

3249
ui64 AddDataToPayload(TString&& blobData) override {
3350
TRope rope;
3451
rope.Insert(rope.End(), TRope(blobData));
3552
return Event.AddPayload(std::move(rope));
3653
}
3754
};
38-
3955
}

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -775,7 +775,7 @@ struct TTestHelper {
775775
TSerializedCellMatrix matrix(cells, 1, columnCount);
776776

777777
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
778-
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
778+
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
779779
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, table.TableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
780780

781781
return evWrite;

ydb/core/tx/datashard/datashard_ut_write.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
9494
TSerializedCellMatrix matrix({TCell(hugeStringValue.c_str(), hugeStringValue.size())}, 1, 1);
9595

9696
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
97-
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
97+
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
9898
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
9999

100100
const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);

ydb/core/tx/datashard/datashard_write_operation.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ bool TValidatedWriteTx::ParseOperations(const TDataShard::TTableInfos& tableInfo
9393
return false;
9494
}
9595

96-
NEvWrite::TPayloadHelper<NEvents::TDataEvents::TEvWrite> payloadHelper(*Ev->Get());
97-
TString payload = payloadHelper.GetDataFromPayload(RecordOperation().GetPayloadIndex());
96+
NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite> payloadReader(*Ev->Get());
97+
TString payload = payloadReader.GetDataFromPayload(RecordOperation().GetPayloadIndex());
9898

9999
if (!TSerializedCellMatrix::TryParse(payload,Matrix))
100100
{

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -1835,7 +1835,7 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKik
18351835
UNIT_ASSERT(blobData.size() < 8_MB);
18361836

18371837
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
1838-
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
1838+
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
18391839
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
18401840

18411841
return evWrite;
@@ -1936,7 +1936,7 @@ TTestActorRuntimeBase::TEventObserverHolderPair ReplaceEvProposeTransactionWithE
19361936
std::iota(columnIds.begin(), columnIds.end(), 1);
19371937

19381938
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
1939-
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
1939+
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
19401940
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
19411941

19421942
// Copy locks

ydb/core/tx/schemeshard/ut_helpers/helpers.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -2344,7 +2344,7 @@ namespace NSchemeShardUT_Private {
23442344
TSerializedCellMatrix matrix(cells, 1, 2);
23452345

23462346
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
2347-
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(matrix.ReleaseBuffer()));
2347+
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(matrix.ReleaseBuffer()));
23482348
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
23492349

23502350
ForwardToTablet(runtime, datashardTabletId, sender, evWrite.release());

0 commit comments

Comments
 (0)