diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp index 53bf57d625fc..a33b8a4ee102 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp @@ -239,7 +239,7 @@ void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, TStringBu } void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) { - return WriteInternal(std::move(token), std::move(message)); + return WriteInternal(std::move(token), TWrappedWriteMessage(std::move(message))); } void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, TStringBuf data, NTopic::ECodec codec, @@ -249,24 +249,24 @@ void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, TS message.SeqNo(*seqNo); if (createTimestamp.Defined()) message.CreateTimestamp(*createTimestamp); - return WriteInternal(std::move(token), std::move(message)); + return WriteInternal(std::move(token), TWrappedWriteMessage(std::move(message))); } void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) { - return WriteInternal(std::move(token), std::move(message)); + return WriteInternal(std::move(token), TWrappedWriteMessage(std::move(message))); } -void TFederatedWriteSession::WriteInternal(NTopic::TContinuationToken&&, NTopic::TWriteMessage&& message) { +void TFederatedWriteSession::WriteInternal(NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) { ClientHasToken = false; - if (!message.CreateTimestamp_.Defined()) { - message.CreateTimestamp_ = TInstant::Now(); + if (!wrapped.Message.CreateTimestamp_.Defined()) { + wrapped.Message.CreateTimestamp_ = TInstant::Now(); } { TDeferredWrite deferred(Subsession); with_lock(Lock) { - BufferFreeSpace -= message.Data.size(); - OriginalMessagesToPassDown.emplace_back(std::move(message)); + BufferFreeSpace -= wrapped.Message.Data.size(); + OriginalMessagesToPassDown.emplace_back(std::move(wrapped)); PrepareDeferredWrite(deferred); } @@ -285,10 +285,10 @@ bool TFederatedWriteSession::PrepareDeferredWrite(TDeferredWrite& deferred) { if (OriginalMessagesToPassDown.empty()) { return false; } - OriginalMessagesToGetAck.push_back(OriginalMessagesToPassDown.front()); - deferred.Token.ConstructInPlace(std::move(*PendingToken)); - deferred.Message.ConstructInPlace(std::move(OriginalMessagesToPassDown.front())); + OriginalMessagesToGetAck.push_back(std::move(OriginalMessagesToPassDown.front())); OriginalMessagesToPassDown.pop_front(); + deferred.Token.ConstructInPlace(std::move(*PendingToken)); + deferred.Message.ConstructInPlace(std::move(OriginalMessagesToGetAck.back().Message)); PendingToken.Clear(); return true; } diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h index de0c33eccebd..2af97b35b5de 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h @@ -62,6 +62,20 @@ class TFederatedWriteSession : public NTopic::IWriteSession, inline NTopic::TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; +private: + + class TWrappedWriteMessage { + public: + const TString Data; + NTopic::TWriteMessage Message; + TWrappedWriteMessage(NTopic::TWriteMessage&& message) + : Data(message.Data) + , Message(std::move(message)) + { + Message.Data = Data; + } + }; + private: void Start(); void OpenSubSessionImpl(std::shared_ptr db); @@ -71,7 +85,7 @@ class TFederatedWriteSession : public NTopic::IWriteSession, void OnFederatedStateUpdateImpl(); void ScheduleFederatedStateUpdateImpl(TDuration delay); - void WriteInternal(NTopic::TContinuationToken&&, NTopic::TWriteMessage&& message); + void WriteInternal(NTopic::TContinuationToken&&, TWrappedWriteMessage&& message); bool PrepareDeferredWrite(TDeferredWrite& deferred); void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); @@ -105,8 +119,8 @@ class TFederatedWriteSession : public NTopic::IWriteSession, TMaybe PendingToken; // from Subsession bool ClientHasToken = false; - std::deque OriginalMessagesToPassDown; - std::deque OriginalMessagesToGetAck; + std::deque OriginalMessagesToPassDown; + std::deque OriginalMessagesToGetAck; i64 BufferFreeSpace; // Exiting. diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index 87d059225db7..1557b1094604 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -1602,7 +1602,7 @@ struct TWriteMessage { } //! Message body. - const TStringBuf Data; + TStringBuf Data; //! Codec and original size for compressed message. //! Do not specify or change these options directly, use CompressedMessage()