diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index 9214c2254e..f7f2e2785c 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -1276,30 +1276,7 @@ class TTxSettings { ETransactionMode Mode_; }; -class TTxControl { - friend class TTableClient; - -public: - using TSelf = TTxControl; - - static TTxControl Tx(const TTransaction& tx) { - return TTxControl(tx); - } - - static TTxControl BeginTx(const TTxSettings& settings = TTxSettings()) { - return TTxControl(settings); - } - - FLUENT_SETTING_FLAG(CommitTx); - -private: - TTxControl(const TTransaction& tx); - TTxControl(const TTxSettings& begin); - -private: - std::optional TxId_; - TTxSettings BeginTx_; -}; +class TTxControl; enum class EAutoPartitioningPolicy { Disabled = 1, @@ -1845,6 +1822,8 @@ class TTransaction { private: TTransaction(const TSession& session, const std::string& txId); + TAsyncStatus Precommit() const; + class TImpl; std::shared_ptr TransactionImpl_; @@ -1852,6 +1831,31 @@ class TTransaction { //////////////////////////////////////////////////////////////////////////////// +class TTxControl { + friend class TTableClient; + +public: + using TSelf = TTxControl; + + static TTxControl Tx(const TTransaction& tx) { + return TTxControl(tx); + } + + static TTxControl BeginTx(const TTxSettings& settings = TTxSettings()) { + return TTxControl(settings); + } + + FLUENT_SETTING_FLAG(CommitTx); + +private: + TTxControl(const TTransaction& tx); + TTxControl(const TTxSettings& begin); + +private: + std::optional Tx_; + TTxSettings BeginTx_; +}; + //! Represents query identificator (e.g. used for prepared query) class TDataQuery { friend class TTableClient; diff --git a/include/ydb-cpp-sdk/library/grpc/client/grpc_client_low.h b/include/ydb-cpp-sdk/library/grpc/client/grpc_client_low.h index 6493b95cc5..ae0f898d4a 100644 --- a/include/ydb-cpp-sdk/library/grpc/client/grpc_client_low.h +++ b/include/ydb-cpp-sdk/library/grpc/client/grpc_client_low.h @@ -1075,7 +1075,7 @@ class TStreamRequestReadWriteProcessor Y_ABORT_UNLESS(ReadActive, "Unexpected Read done callback"); Y_ABORT_UNLESS(!ReadFinished, "Unexpected ReadFinished flag"); - if (!ok || Cancelled || WriteFinished) { + if (!ok || Cancelled) { ReadFinished = true; if (!WriteActive) { WriteFinished = true; @@ -1124,9 +1124,6 @@ class TStreamRequestReadWriteProcessor if (!ok || Cancelled) { WriteActive = false; WriteFinished = true; - if (!ReadActive) { - ReadFinished = true; - } if (ReadFinished) { Stream->Finish(&Status, OnFinishedTag.Prepare()); } diff --git a/src/client/federated_topic/impl/federation_observer.cpp b/src/client/federated_topic/impl/federation_observer.cpp index 53ec045325..a87d486396 100644 --- a/src/client/federated_topic/impl/federation_observer.cpp +++ b/src/client/federated_topic/impl/federation_observer.cpp @@ -179,22 +179,23 @@ void TFederatedDbObserverImpl::OnFederationDiscovery(TStatus&& status, Ydb::Fede } IOutputStream& operator<<(IOutputStream& out, TFederatedDbState const& state) { - out << "{ Status: " << state.Status.GetStatus(); + out << "{ Status: " << state.Status.GetStatus() + << " SelfLocation: \"" << state.SelfLocation << '"'; if (auto const& issues = state.Status.GetIssues(); !issues.Empty()) { - out << ", Issues: { " << issues.ToOneLineString() << " }"; + out << " Issues: { " << issues.ToOneLineString() << " }"; } if (!state.DbInfos.empty()) { - out << ", DbInfos: { "; + out << " DbInfos: [ "; bool first = true; for (auto const& info : state.DbInfos) { if (first) { first = false; } else { - out << ", "; + out << " "; } out << "{ " << info->ShortDebugString() << " }"; } - out << " }"; + out << " ]"; } return out << " }"; } diff --git a/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h index ed4e2af4c5..a0c33090c5 100644 --- a/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h +++ b/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h @@ -538,6 +538,11 @@ class TGRpcConnectionsImpl SetDatabaseHeader(meta, dbState->Database); } + static const std::string clientPid = GetClientPIDHeaderValue(); + meta.Aux.push_back({YDB_SDK_BUILD_INFO_HEADER, CreateSDKBuildInfo()}); + meta.Aux.push_back({YDB_CLIENT_PID, clientPid}); + meta.Aux.insert(meta.Aux.end(), requestSettings.Header.begin(), requestSettings.Header.end()); + dbState->StatCollector.IncGRpcInFlight(); dbState->StatCollector.IncGRpcInFlightByHost(endpoint.GetEndpoint()); diff --git a/src/client/table/impl/table_client.h b/src/client/table/impl/table_client.h index 7f66e4193f..dbb94fa209 100644 --- a/src/client/table/impl/table_client.h +++ b/src/client/table/impl/table_client.h @@ -79,7 +79,7 @@ class TTableClient::TImpl: public TClientImplCommon, public CacheMissCounter.Inc(); return ::NYdb::NSessionPool::InjectSessionStatusInterception(session.SessionImpl_, - ExecuteDataQueryInternal(session, query, txControl, params, settings, false), + ExecuteDataQueryImpl(session, query, txControl, params, settings, false), true, GetMinTimeToTouch(Settings_.SessionPoolSettings_)); } @@ -96,7 +96,7 @@ class TTableClient::TImpl: public TClientImplCommon, public return ::NYdb::NSessionPool::InjectSessionStatusInterception( session.SessionImpl_, - session.Client_->ExecuteDataQueryInternal(session, dataQuery, txControl, params, settings, fromCache), + session.Client_->ExecuteDataQueryImpl(session, dataQuery, txControl, params, settings, fromCache), true, GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_), cb); @@ -171,6 +171,32 @@ class TTableClient::TImpl: public TClientImplCommon, public static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>&); + template + TAsyncDataQueryResult ExecuteDataQueryImpl(const TSession& session, const TQueryType& query, + const TTxControl& txControl, TParamsType params, + const TExecDataQuerySettings& settings, bool fromCache + ) { + if (!txControl.Tx_.has_value() || !txControl.CommitTx_) { + return ExecuteDataQueryInternal(session, query, txControl, params, settings, fromCache); + } + + auto onPrecommitCompleted = [this, session, query, txControl, params, settings, fromCache](const NThreading::TFuture& f) { + TStatus status = f.GetValueSync(); + if (!status.IsSuccess()) { + return NThreading::MakeFuture(TDataQueryResult(std::move(status), + {}, + txControl.Tx_, + std::nullopt, + false, + std::nullopt)); + } + + return ExecuteDataQueryInternal(session, query, txControl, params, settings, fromCache); + }; + + return txControl.Tx_->Precommit().Apply(onPrecommitCompleted); + } + template TAsyncDataQueryResult ExecuteDataQueryInternal(const TSession& session, const TQueryType& query, const TTxControl& txControl, TParamsType params, @@ -180,8 +206,8 @@ class TTableClient::TImpl: public TClientImplCommon, public request.set_session_id(TStringType{session.GetId()}); auto txControlProto = request.mutable_tx_control(); txControlProto->set_commit_tx(txControl.CommitTx_); - if (txControl.TxId_) { - txControlProto->set_tx_id(TStringType{txControl.TxId_.value()}); + if (txControl.Tx_.has_value()) { + txControlProto->set_tx_id(TStringType{txControl.Tx_->GetId()}); } else { SetTxSettings(txControl.BeginTx_, txControlProto->mutable_begin_tx()); } diff --git a/src/client/table/impl/transaction.cpp b/src/client/table/impl/transaction.cpp index ccd56f1dde..e4440fdd6d 100644 --- a/src/client/table/impl/transaction.cpp +++ b/src/client/table/impl/transaction.cpp @@ -9,10 +9,8 @@ TTransaction::TImpl::TImpl(const TSession& session, const std::string& txId) { } -TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSettings& settings) +TAsyncStatus TTransaction::TImpl::Precommit() const { - ChangesAreAccepted = false; - auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {})); for (auto& callback : PrecommitCallbacks) { @@ -27,6 +25,15 @@ TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSetting result = result.Apply(action); } + return result; +} + +TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSettings& settings) +{ + ChangesAreAccepted = false; + + auto result = Precommit(); + auto precommitsCompleted = [this, settings](const TAsyncStatus& result) mutable { if (const TStatus& status = result.GetValue(); !status.IsSuccess()) { return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), std::nullopt)); diff --git a/src/client/table/impl/transaction.h b/src/client/table/impl/transaction.h index 32b336af4b..5507606b29 100644 --- a/src/client/table/impl/transaction.h +++ b/src/client/table/impl/transaction.h @@ -16,6 +16,7 @@ class TTransaction::TImpl { return !TxId_.empty(); } + TAsyncStatus Precommit() const; TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings()); TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings()); diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index c597b2ddf9..f4b45bb9bf 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -1989,7 +1989,7 @@ const std::string& TSession::GetId() const { //////////////////////////////////////////////////////////////////////////////// TTxControl::TTxControl(const TTransaction& tx) - : TxId_(tx.GetId()) + : Tx_(tx) {} TTxControl::TTxControl(const TTxSettings& begin) @@ -2012,6 +2012,11 @@ bool TTransaction::IsActive() const return TransactionImpl_->IsActive(); } +TAsyncStatus TTransaction::Precommit() const +{ + return TransactionImpl_->Precommit(); +} + TAsyncCommitTransactionResult TTransaction::Commit(const TCommitTxSettings& settings) { return TransactionImpl_->Commit(settings); } diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 8c1df743c4..177c4be250 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -172,6 +172,8 @@ class TFixture : public NUnitTest::TBaseFixture { void CheckTabletKeys(const TString& topicName); void DumpPQTabletKeys(const TString& topicName); + NTable::TDataQueryResult ExecuteDataQuery(NTable::TSession session, const TString& query, const NTable::TTxControl& control); + private: template E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx); @@ -1535,6 +1537,13 @@ void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestD } } +NTable::TDataQueryResult TFixture::ExecuteDataQuery(NTable::TSession session, const TString& query, const NTable::TTxControl& control) +{ + auto status = session.ExecuteDataQuery(query, control).GetValueSync(); + UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToString()); + return status; +} + Y_UNIT_TEST_F(WriteToTopic_Demo_11, TFixture) { for (auto endOfTransaction : {Commit, Rollback, CloseTableSession}) { @@ -2148,6 +2157,69 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_41, TFixture) CommitTx(tx, EStatus::SESSION_EXPIRED); } +Y_UNIT_TEST_F(WriteToTopic_Demo_42, TFixture) +{ + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + for (size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx); + } + + CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID); // gracefully close + + CommitTx(tx, EStatus::SUCCESS); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 100); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_43, TFixture) +{ + // The recording stream will run into a quota. Before the commit, the client will receive confirmations + // for some of the messages. The `ExecuteDataQuery` call will wait for the rest. + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + for (size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx); + } + + ExecuteDataQuery(tableSession, "SELECT 1", NTable::TTxControl::Tx(tx).CommitTx(true)); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 100); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_44, TFixture) +{ + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + + auto result = ExecuteDataQuery(tableSession, "SELECT 1", NTable::TTxControl::BeginTx()); + + NTable::TTransaction tx = *result.GetTransaction(); + + for (size_t k = 0; k < 100; ++k) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx); + } + + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0); + + ExecuteDataQuery(tableSession, "SELECT 2", NTable::TTxControl::Tx(tx).CommitTx(true)); + + messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 100); +} + } }