Skip to content

Commit b716c4a

Browse files
Alek5andr-KotovKamil Khamitov
authored and
Kamil Khamitov
committed
The 'use-heap-after-free` error in the Topic SDK code (ydb-platform#15088)
1 parent 1c0a82b commit b716c4a

File tree

4 files changed

+51
-26
lines changed

4 files changed

+51
-26
lines changed

ydb/public/sdk/cpp/src/client/topic/impl/topic_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,8 @@ class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
296296
{
297297
auto request = MakeOperationRequest<Ydb::Topic::UpdateOffsetsInTransactionRequest>(settings);
298298

299-
request.mutable_tx()->set_id(TStringType{GetTxId(tx)});
300-
request.mutable_tx()->set_session(TStringType{GetSessionId(tx)});
299+
request.mutable_tx()->set_id(tx.TxId);
300+
request.mutable_tx()->set_session(tx.SessionId);
301301

302302
for (auto& t : topics) {
303303
auto* topic = request.mutable_topics()->Add();

ydb/public/sdk/cpp/src/client/topic/impl/transaction.h

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,21 @@ class TTransaction;
1010

1111
namespace NYdb::inline V3::NTopic {
1212

13-
using TTransactionId = std::pair<std::string, std::string>;
13+
struct TTransactionId {
14+
std::string SessionId;
15+
std::string TxId;
16+
};
1417

1518
inline
16-
const std::string& GetSessionId(const TTransactionId& x)
19+
bool operator==(const TTransactionId& lhs, const TTransactionId& rhs)
1720
{
18-
return x.first;
21+
return (lhs.SessionId == rhs.SessionId) && (lhs.TxId == rhs.TxId);
1922
}
2023

2124
inline
22-
const std::string& GetTxId(const TTransactionId& x)
25+
bool operator!=(const TTransactionId& lhs, const TTransactionId& rhs)
2326
{
24-
return x.second;
27+
return !(lhs == rhs);
2528
}
2629

2730
TTransactionId MakeTransactionId(const NTable::TTransaction& tx);
@@ -30,3 +33,10 @@ TStatus MakeSessionExpiredError();
3033
TStatus MakeCommitTransactionSuccess();
3134

3235
}
36+
37+
template <>
38+
struct THash<NYdb::NTopic::TTransactionId> {
39+
size_t operator()(const NYdb::NTopic::TTransactionId& v) const noexcept {
40+
return CombineHashes(THash<std::string>()(v.SessionId), THash<std::string>()(v.TxId));
41+
}
42+
};

ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.cpp

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
#include <util/stream/buffer.h>
1313
#include <util/generic/guid.h>
1414

15+
template <>
16+
void Out<NYdb::NTopic::TTransactionId>(IOutputStream& s, const NYdb::NTopic::TTransactionId& v)
17+
{
18+
s << "{" << v.SessionId << ", " << v.TxId << "}";
19+
}
20+
1521
namespace NYdb::inline V3::NTopic {
1622

1723
const TDuration UPDATE_TOKEN_PERIOD = TDuration::Hours(1);
@@ -35,13 +41,22 @@ TTxIdOpt GetTransactionId(const Ydb::Topic::StreamWriteMessage_WriteRequest& req
3541
return TTxId(tx.session(), tx.id());
3642
}
3743

38-
TTxIdOpt GetTransactionId(const NTable::TTransaction* tx)
44+
TTxIdOpt GetTransactionId(const std::optional<TTransactionId>& tx)
45+
{
46+
if (!tx) {
47+
return std::nullopt;
48+
}
49+
50+
return TTxId(tx->SessionId, tx->TxId);
51+
}
52+
53+
std::optional<TTransactionId> MakeTransactionId(const NTable::TTransaction* tx)
3954
{
4055
if (!tx) {
4156
return std::nullopt;
4257
}
4358

44-
return TTxId(tx->GetSession().GetId(), tx->GetId());
59+
return TTransactionId{tx->GetSession().GetId(), tx->GetId()};
4560
}
4661

4762
}
@@ -584,7 +599,7 @@ void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo)
584599
++txInfo->AckCount;
585600

586601
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG,
587-
LogPrefixImpl() << "OnAck: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount);
602+
LogPrefixImpl() << "OnAck: seqNo=" << seqNo << ", txId=" << txId << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount);
588603

589604
if (txInfo->CommitCalled && (txInfo->WriteCount == txInfo->AckCount)) {
590605
txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess());
@@ -631,15 +646,15 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess
631646
++txInfo->WriteCount;
632647

633648
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG,
634-
LogPrefixImpl() << "OnWrite: seqNo=" << seqNo << ", txId=" << GetTxId(txId) << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount);
649+
LogPrefixImpl() << "OnWrite: seqNo=" << seqNo << ", txId=" << txId << ", WriteCount=" << txInfo->WriteCount << ", AckCount=" << txInfo->AckCount);
635650
}
636651
WrittenInTx[seqNo] = txId;
637652
}
638653

639654
CurrentBatch.Add(
640655
seqNo, createdAtValue, message.Data, message.Codec, message.OriginalSize,
641656
message.MessageMeta_,
642-
message.GetTxPtr()
657+
MakeTransactionId(message.GetTxPtr())
643658
);
644659

645660
FlushWriteIfRequiredImpl();
@@ -1412,10 +1427,10 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
14121427
if (!currMessage.MessageMeta.empty()) {
14131428
OriginalMessagesToSend.emplace(id, createTs, datum.size(),
14141429
std::move(currMessage.MessageMeta),
1415-
currMessage.Tx);
1430+
std::move(currMessage.Tx));
14161431
} else {
14171432
OriginalMessagesToSend.emplace(id, createTs, datum.size(),
1418-
currMessage.Tx);
1433+
std::move(currMessage.Tx));
14191434
}
14201435
}
14211436
block.Data = std::move(CurrentBatch.Data);
@@ -1523,8 +1538,8 @@ void TWriteSessionImpl::SendImpl() {
15231538
auto* msgData = writeRequest->add_messages();
15241539

15251540
if (message.Tx) {
1526-
writeRequest->mutable_tx()->set_id(TStringType{message.Tx->GetId()});
1527-
writeRequest->mutable_tx()->set_session(TStringType{message.Tx->GetSession().GetId()});
1541+
writeRequest->mutable_tx()->set_id(message.Tx->TxId);
1542+
writeRequest->mutable_tx()->set_session(message.Tx->SessionId);
15281543
}
15291544

15301545
msgData->set_seq_no(GetSeqNoImpl(message.Id));

ydb/public/sdk/cpp/src/client/topic/impl/write_session_impl.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -164,18 +164,18 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
164164
std::optional<ECodec> Codec;
165165
ui32 OriginalSize; // only for coded messages
166166
std::vector<std::pair<std::string, std::string>> MessageMeta;
167-
const NTable::TTransaction* Tx;
167+
std::optional<TTransactionId> Tx;
168168

169169
TMessage(uint64_t id, const TInstant& createdAt, std::string_view data, std::optional<ECodec> codec = {},
170170
ui32 originalSize = 0, const std::vector<std::pair<std::string, std::string>>& messageMeta = {},
171-
const NTable::TTransaction* tx = nullptr)
171+
std::optional<TTransactionId>&& tx = {})
172172
: Id(id)
173173
, CreatedAt(createdAt)
174174
, DataRef(data)
175175
, Codec(codec)
176176
, OriginalSize(originalSize)
177177
, MessageMeta(messageMeta)
178-
, Tx(tx)
178+
, Tx(std::move(tx))
179179
{}
180180
};
181181

@@ -189,11 +189,11 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
189189

190190
void Add(uint64_t id, const TInstant& createdAt, std::string_view data, std::optional<ECodec> codec, ui32 originalSize,
191191
const std::vector<std::pair<std::string, std::string>>& messageMeta,
192-
const NTable::TTransaction* tx) {
192+
std::optional<TTransactionId>&& tx) {
193193
if (StartedAt == TInstant::Zero())
194194
StartedAt = TInstant::Now();
195195
CurrentSize += codec ? originalSize : data.size();
196-
Messages.emplace_back(id, createdAt, data, codec, originalSize, messageMeta, tx);
196+
Messages.emplace_back(id, createdAt, data, codec, originalSize, messageMeta, std::move(tx));
197197
Acquired = false;
198198
}
199199

@@ -263,24 +263,24 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
263263
TInstant CreatedAt;
264264
size_t Size;
265265
std::vector<std::pair<std::string, std::string>> MessageMeta;
266-
const NTable::TTransaction* Tx;
266+
std::optional<TTransactionId> Tx;
267267

268268
TOriginalMessage(const uint64_t id, const TInstant createdAt, const size_t size,
269-
const NTable::TTransaction* tx)
269+
std::optional<TTransactionId>&& tx)
270270
: Id(id)
271271
, CreatedAt(createdAt)
272272
, Size(size)
273-
, Tx(tx)
273+
, Tx(std::move(tx))
274274
{}
275275

276276
TOriginalMessage(const uint64_t id, const TInstant createdAt, const size_t size,
277277
std::vector<std::pair<std::string, std::string>>&& messageMeta,
278-
const NTable::TTransaction* tx)
278+
std::optional<TTransactionId>&& tx)
279279
: Id(id)
280280
, CreatedAt(createdAt)
281281
, Size(size)
282282
, MessageMeta(std::move(messageMeta))
283-
, Tx(tx)
283+
, Tx(std::move(tx))
284284
{}
285285
};
286286

0 commit comments

Comments
 (0)