Skip to content

Split TPayloadHelper to TPayloadReader & TPayloadWriter #1702

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(
inFlightBatch.TxId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite)
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite)
.AddDataToPayload(TString(inFlightBatch.Data));

evWrite->AddOperation(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
}

auto arrowData = std::make_shared<TArrowData>(schema);
if (!arrowData->Parse(operation, NEvWrite::TPayloadHelper<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
if (!arrowData->Parse(operation, NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error");
ctx.Send(source, result.release());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/operations/write_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace NKikimr::NColumnShard {

bool TArrowData::Parse(const NKikimrDataEvents::TEvWrite_TOperation& proto, const NEvWrite::IPayloadData& payload) {
bool TArrowData::Parse(const NKikimrDataEvents::TEvWrite_TOperation& proto, const NEvWrite::IPayloadReader& payload) {
if(proto.GetPayloadFormat() != NKikimrDataEvents::FORMAT_ARROW)
{
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_payload_format")("payload_format", (ui64)proto.GetPayloadFormat());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/operations/write_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TArrowData : public NEvWrite::IDataContainer {
: IndexSchema(schema)
{}

bool Parse(const NKikimrDataEvents::TEvWrite::TOperation& proto, const NKikimr::NEvWrite::IPayloadData& payload);
bool Parse(const NKikimrDataEvents::TEvWrite::TOperation& proto, const NKikimr::NEvWrite::IPayloadReader& payload);
virtual std::shared_ptr<arrow::RecordBatch> ExtractBatch() override;
ui64 GetSchemaVersion() const override;
ui64 GetSize() const override {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);

TActorId sender = runtime.AllocateEdgeActor();
Expand Down Expand Up @@ -1634,7 +1634,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);

TActorId sender = runtime.AllocateEdgeActor();
Expand Down Expand Up @@ -1681,7 +1681,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
UNIT_ASSERT(blobData.size() > TLimits::GetMaxBlobSize());

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);

TActorId sender = runtime.AllocateEdgeActor();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ Y_UNIT_TEST_SUITE(Normalizers) {
TString blobData = NArrow::SerializeBatchNoCompression(batch);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
ui64 payloadIndex = NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);

TActorId sender = runtime.AllocateEdgeActor();
Expand Down
28 changes: 22 additions & 6 deletions ydb/core/tx/data_events/payload_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@

namespace NKikimr::NEvWrite {

class IPayloadData {
class IPayloadReader {
public:
virtual TString GetDataFromPayload(const ui64 index) const = 0;
virtual ~IPayloadReader() {
}
};

class IPayloadWriter {
public:
virtual ui64 AddDataToPayload(TString&& blobData) = 0;
virtual ~IPayloadData() {
virtual ~IPayloadWriter() {
}
};

template <class TEvent>
class TPayloadHelper: public IPayloadData {
TEvent& Event;
class TPayloadReader: public IPayloadReader {
const TEvent& Event;

public:
TPayloadHelper(TEvent& ev)
TPayloadReader(const TEvent& ev)
: Event(ev)
{
}
Expand All @@ -28,12 +34,22 @@ class TPayloadHelper: public IPayloadData {
rope.Begin().ExtractPlainDataAndAdvance(data.Detach(), data.size());
return data;
}
};

template <class TEvent>
class TPayloadWriter: public IPayloadWriter {
TEvent& Event;

public:
TPayloadWriter(TEvent& ev)
: Event(ev)
{
}

ui64 AddDataToPayload(TString&& blobData) override {
TRope rope;
rope.Insert(rope.End(), TRope(blobData));
return Event.AddPayload(std::move(rope));
}
};

}
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ struct TTestHelper {
TSerializedCellMatrix matrix(cells, 1, columnCount);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, table.TableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

return evWrite;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
TSerializedCellMatrix matrix({TCell(hugeStringValue.c_str(), hugeStringValue.size())}, 1, 1);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ bool TValidatedWriteTx::ParseOperations(const TDataShard::TTableInfos& tableInfo
return false;
}

NEvWrite::TPayloadHelper<NEvents::TDataEvents::TEvWrite> payloadHelper(*Ev->Get());
TString payload = payloadHelper.GetDataFromPayload(RecordOperation().GetPayloadIndex());
NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite> payloadReader(*Ev->Get());
TString payload = payloadReader.GetDataFromPayload(RecordOperation().GetPayloadIndex());

if (!TSerializedCellMatrix::TryParse(payload,Matrix))
{
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1835,7 +1835,7 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKik
UNIT_ASSERT(blobData.size() < 8_MB);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

return evWrite;
Expand Down Expand Up @@ -1936,7 +1936,7 @@ TTestActorRuntimeBase::TEventObserverHolderPair ReplaceEvProposeTransactionWithE
std::iota(columnIds.begin(), columnIds.end(), 1);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

// Copy locks
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2344,7 +2344,7 @@ namespace NSchemeShardUT_Private {
TSerializedCellMatrix matrix(cells, 1, 2);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(matrix.ReleaseBuffer()));
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(matrix.ReleaseBuffer()));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

ForwardToTablet(runtime, datashardTabletId, sender, evWrite.release());
Expand Down