Skip to content

Commit 1f02c95

Browse files
[-] the message does not store a pointer
1 parent b7b9d26 commit 1f02c95

File tree

2 files changed

+26
-17
lines changed

2 files changed

+26
-17
lines changed

ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,22 @@ TTxIdOpt GetTransactionId(const Ydb::Topic::StreamWriteMessage_WriteRequest& req
4141
return TTxId(tx.session(), tx.id());
4242
}
4343

44-
TTxIdOpt GetTransactionId(const NTable::TTransaction* tx)
44+
TTxIdOpt GetTransactionId(const std::optional<TTransactionId>& tx)
4545
{
4646
if (!tx) {
4747
return std::nullopt;
4848
}
4949

50-
return TTxId(tx->GetSession().GetId(), tx->GetId());
50+
return TTxId(tx->SessionId, tx->TxId);
51+
}
52+
53+
std::optional<TTransactionId> MakeTransactionId(const NTable::TTransaction* tx)
54+
{
55+
if (!tx) {
56+
return std::nullopt;
57+
}
58+
59+
return TTransactionId{tx->GetSession().GetId(), tx->GetId()};
5160
}
5261

5362
}
@@ -645,7 +654,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess
645654
CurrentBatch.Add(
646655
seqNo, createdAtValue, message.Data, message.Codec, message.OriginalSize,
647656
message.MessageMeta_,
648-
message.GetTxPtr()
657+
MakeTransactionId(message.GetTxPtr())
649658
);
650659

651660
FlushWriteIfRequiredImpl();
@@ -1418,10 +1427,10 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
14181427
if (!currMessage.MessageMeta.empty()) {
14191428
OriginalMessagesToSend.emplace(id, createTs, datum.size(),
14201429
std::move(currMessage.MessageMeta),
1421-
currMessage.Tx);
1430+
std::move(currMessage.Tx));
14221431
} else {
14231432
OriginalMessagesToSend.emplace(id, createTs, datum.size(),
1424-
currMessage.Tx);
1433+
std::move(currMessage.Tx));
14251434
}
14261435
}
14271436
block.Data = std::move(CurrentBatch.Data);
@@ -1529,8 +1538,8 @@ void TWriteSessionImpl::SendImpl() {
15291538
auto* msgData = writeRequest->add_messages();
15301539

15311540
if (message.Tx) {
1532-
writeRequest->mutable_tx()->set_id(TStringType{message.Tx->GetId()});
1533-
writeRequest->mutable_tx()->set_session(TStringType{message.Tx->GetSession().GetId()});
1541+
writeRequest->mutable_tx()->set_id(message.Tx->TxId);
1542+
writeRequest->mutable_tx()->set_session(message.Tx->SessionId);
15341543
}
15351544

15361545
msgData->set_seq_no(GetSeqNoImpl(message.Id));

ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -164,18 +164,18 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
164164
std::optional<ECodec> Codec;
165165
ui32 OriginalSize; // only for coded messages
166166
std::vector<std::pair<std::string, std::string>> MessageMeta;
167-
const NTable::TTransaction* Tx;
167+
std::optional<TTransactionId> Tx;
168168

169169
TMessage(uint64_t id, const TInstant& createdAt, std::string_view data, std::optional<ECodec> codec = {},
170170
ui32 originalSize = 0, const std::vector<std::pair<std::string, std::string>>& messageMeta = {},
171-
const NTable::TTransaction* tx = nullptr)
171+
std::optional<TTransactionId>&& tx = {})
172172
: Id(id)
173173
, CreatedAt(createdAt)
174174
, DataRef(data)
175175
, Codec(codec)
176176
, OriginalSize(originalSize)
177177
, MessageMeta(messageMeta)
178-
, Tx(tx)
178+
, Tx(std::move(tx))
179179
{}
180180
};
181181

@@ -189,11 +189,11 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
189189

190190
void Add(uint64_t id, const TInstant& createdAt, std::string_view data, std::optional<ECodec> codec, ui32 originalSize,
191191
const std::vector<std::pair<std::string, std::string>>& messageMeta,
192-
const NTable::TTransaction* tx) {
192+
std::optional<TTransactionId>&& tx) {
193193
if (StartedAt == TInstant::Zero())
194194
StartedAt = TInstant::Now();
195195
CurrentSize += codec ? originalSize : data.size();
196-
Messages.emplace_back(id, createdAt, data, codec, originalSize, messageMeta, tx);
196+
Messages.emplace_back(id, createdAt, data, codec, originalSize, messageMeta, std::move(tx));
197197
Acquired = false;
198198
}
199199

@@ -263,24 +263,24 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
263263
TInstant CreatedAt;
264264
size_t Size;
265265
std::vector<std::pair<std::string, std::string>> MessageMeta;
266-
const NTable::TTransaction* Tx;
266+
std::optional<TTransactionId> Tx;
267267

268268
TOriginalMessage(const uint64_t id, const TInstant createdAt, const size_t size,
269-
const NTable::TTransaction* tx)
269+
std::optional<TTransactionId>&& tx)
270270
: Id(id)
271271
, CreatedAt(createdAt)
272272
, Size(size)
273-
, Tx(tx)
273+
, Tx(std::move(tx))
274274
{}
275275

276276
TOriginalMessage(const uint64_t id, const TInstant createdAt, const size_t size,
277277
std::vector<std::pair<std::string, std::string>>&& messageMeta,
278-
const NTable::TTransaction* tx)
278+
std::optional<TTransactionId>&& tx)
279279
: Id(id)
280280
, CreatedAt(createdAt)
281281
, Size(size)
282282
, MessageMeta(std::move(messageMeta))
283-
, Tx(tx)
283+
, Tx(std::move(tx))
284284
{}
285285
};
286286

0 commit comments

Comments
 (0)