Skip to content

Commit bfd2866

Browse files
authored
ydb_federated_topic: copy message data (#3159)
1 parent aebde93 commit bfd2866

File tree

3 files changed

+29
-15
lines changed

3 files changed

+29
-15
lines changed

ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp

+11-11
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, TStringBu
239239
}
240240

241241
void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
242-
return WriteInternal(std::move(token), std::move(message));
242+
return WriteInternal(std::move(token), TWrappedWriteMessage(std::move(message)));
243243
}
244244

245245
void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, TStringBuf data, NTopic::ECodec codec,
@@ -249,24 +249,24 @@ void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, TS
249249
message.SeqNo(*seqNo);
250250
if (createTimestamp.Defined())
251251
message.CreateTimestamp(*createTimestamp);
252-
return WriteInternal(std::move(token), std::move(message));
252+
return WriteInternal(std::move(token), TWrappedWriteMessage(std::move(message)));
253253
}
254254

255255
void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
256-
return WriteInternal(std::move(token), std::move(message));
256+
return WriteInternal(std::move(token), TWrappedWriteMessage(std::move(message)));
257257
}
258258

259-
void TFederatedWriteSession::WriteInternal(NTopic::TContinuationToken&&, NTopic::TWriteMessage&& message) {
259+
void TFederatedWriteSession::WriteInternal(NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) {
260260
ClientHasToken = false;
261-
if (!message.CreateTimestamp_.Defined()) {
262-
message.CreateTimestamp_ = TInstant::Now();
261+
if (!wrapped.Message.CreateTimestamp_.Defined()) {
262+
wrapped.Message.CreateTimestamp_ = TInstant::Now();
263263
}
264264

265265
{
266266
TDeferredWrite deferred(Subsession);
267267
with_lock(Lock) {
268-
BufferFreeSpace -= message.Data.size();
269-
OriginalMessagesToPassDown.emplace_back(std::move(message));
268+
BufferFreeSpace -= wrapped.Message.Data.size();
269+
OriginalMessagesToPassDown.emplace_back(std::move(wrapped));
270270

271271
PrepareDeferredWrite(deferred);
272272
}
@@ -285,10 +285,10 @@ bool TFederatedWriteSession::PrepareDeferredWrite(TDeferredWrite& deferred) {
285285
if (OriginalMessagesToPassDown.empty()) {
286286
return false;
287287
}
288-
OriginalMessagesToGetAck.push_back(OriginalMessagesToPassDown.front());
289-
deferred.Token.ConstructInPlace(std::move(*PendingToken));
290-
deferred.Message.ConstructInPlace(std::move(OriginalMessagesToPassDown.front()));
288+
OriginalMessagesToGetAck.push_back(std::move(OriginalMessagesToPassDown.front()));
291289
OriginalMessagesToPassDown.pop_front();
290+
deferred.Token.ConstructInPlace(std::move(*PendingToken));
291+
deferred.Message.ConstructInPlace(std::move(OriginalMessagesToGetAck.back().Message));
292292
PendingToken.Clear();
293293
return true;
294294
}

ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h

+17-3
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,20 @@ class TFederatedWriteSession : public NTopic::IWriteSession,
6262

6363
inline NTopic::TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented;
6464

65+
private:
66+
67+
class TWrappedWriteMessage {
68+
public:
69+
const TString Data;
70+
NTopic::TWriteMessage Message;
71+
TWrappedWriteMessage(NTopic::TWriteMessage&& message)
72+
: Data(message.Data)
73+
, Message(std::move(message))
74+
{
75+
Message.Data = Data;
76+
}
77+
};
78+
6579
private:
6680
void Start();
6781
void OpenSubSessionImpl(std::shared_ptr<TDbInfo> db);
@@ -71,7 +85,7 @@ class TFederatedWriteSession : public NTopic::IWriteSession,
7185
void OnFederatedStateUpdateImpl();
7286
void ScheduleFederatedStateUpdateImpl(TDuration delay);
7387

74-
void WriteInternal(NTopic::TContinuationToken&&, NTopic::TWriteMessage&& message);
88+
void WriteInternal(NTopic::TContinuationToken&&, TWrappedWriteMessage&& message);
7589
bool PrepareDeferredWrite(TDeferredWrite& deferred);
7690

7791
void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
@@ -105,8 +119,8 @@ class TFederatedWriteSession : public NTopic::IWriteSession,
105119

106120
TMaybe<NTopic::TContinuationToken> PendingToken; // from Subsession
107121
bool ClientHasToken = false;
108-
std::deque<NTopic::TWriteMessage> OriginalMessagesToPassDown;
109-
std::deque<NTopic::TWriteMessage> OriginalMessagesToGetAck;
122+
std::deque<TWrappedWriteMessage> OriginalMessagesToPassDown;
123+
std::deque<TWrappedWriteMessage> OriginalMessagesToGetAck;
110124
i64 BufferFreeSpace;
111125

112126
// Exiting.

ydb/public/sdk/cpp/client/ydb_topic/topic.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -1602,7 +1602,7 @@ struct TWriteMessage {
16021602
}
16031603

16041604
//! Message body.
1605-
const TStringBuf Data;
1605+
TStringBuf Data;
16061606

16071607
//! Codec and original size for compressed message.
16081608
//! Do not specify or change these options directly, use CompressedMessage()

0 commit comments

Comments
 (0)