Skip to content

Commit b7f54d3

Browse files
Alek5andr-KotovGazizonoki
authored andcommitted
Moved commit "use TTxControl to commit transactions" from ydb repo
1 parent 7689c6c commit b7f54d3

File tree

6 files changed

+147
-32
lines changed

6 files changed

+147
-32
lines changed

include/ydb-cpp-sdk/client/table/table.h

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,30 +1276,7 @@ class TTxSettings {
12761276
ETransactionMode Mode_;
12771277
};
12781278

1279-
class TTxControl {
1280-
friend class TTableClient;
1281-
1282-
public:
1283-
using TSelf = TTxControl;
1284-
1285-
static TTxControl Tx(const TTransaction& tx) {
1286-
return TTxControl(tx);
1287-
}
1288-
1289-
static TTxControl BeginTx(const TTxSettings& settings = TTxSettings()) {
1290-
return TTxControl(settings);
1291-
}
1292-
1293-
FLUENT_SETTING_FLAG(CommitTx);
1294-
1295-
private:
1296-
TTxControl(const TTransaction& tx);
1297-
TTxControl(const TTxSettings& begin);
1298-
1299-
private:
1300-
std::optional<std::string> TxId_;
1301-
TTxSettings BeginTx_;
1302-
};
1279+
class TTxControl;
13031280

13041281
enum class EAutoPartitioningPolicy {
13051282
Disabled = 1,
@@ -1845,13 +1822,40 @@ class TTransaction {
18451822
private:
18461823
TTransaction(const TSession& session, const std::string& txId);
18471824

1825+
TAsyncStatus Precommit() const;
1826+
18481827
class TImpl;
18491828

18501829
std::shared_ptr<TImpl> TransactionImpl_;
18511830
};
18521831

18531832
////////////////////////////////////////////////////////////////////////////////
18541833

1834+
class TTxControl {
1835+
friend class TTableClient;
1836+
1837+
public:
1838+
using TSelf = TTxControl;
1839+
1840+
static TTxControl Tx(const TTransaction& tx) {
1841+
return TTxControl(tx);
1842+
}
1843+
1844+
static TTxControl BeginTx(const TTxSettings& settings = TTxSettings()) {
1845+
return TTxControl(settings);
1846+
}
1847+
1848+
FLUENT_SETTING_FLAG(CommitTx);
1849+
1850+
private:
1851+
TTxControl(const TTransaction& tx);
1852+
TTxControl(const TTxSettings& begin);
1853+
1854+
private:
1855+
std::optional<TTransaction> Tx_;
1856+
TTxSettings BeginTx_;
1857+
};
1858+
18551859
//! Represents query identificator (e.g. used for prepared query)
18561860
class TDataQuery {
18571861
friend class TTableClient;

src/client/table/impl/table_client.h

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public
7979
CacheMissCounter.Inc();
8080

8181
return ::NYdb::NSessionPool::InjectSessionStatusInterception(session.SessionImpl_,
82-
ExecuteDataQueryInternal(session, query, txControl, params, settings, false),
82+
ExecuteDataQueryImpl(session, query, txControl, params, settings, false),
8383
true, GetMinTimeToTouch(Settings_.SessionPoolSettings_));
8484
}
8585

@@ -96,7 +96,7 @@ class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public
9696

9797
return ::NYdb::NSessionPool::InjectSessionStatusInterception<TDataQueryResult>(
9898
session.SessionImpl_,
99-
session.Client_->ExecuteDataQueryInternal(session, dataQuery, txControl, params, settings, fromCache),
99+
session.Client_->ExecuteDataQueryImpl(session, dataQuery, txControl, params, settings, fromCache),
100100
true,
101101
GetMinTimeToTouch(session.Client_->Settings_.SessionPoolSettings_),
102102
cb);
@@ -171,6 +171,32 @@ class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public
171171

172172
static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>&);
173173

174+
template <typename TQueryType, typename TParamsType>
175+
TAsyncDataQueryResult ExecuteDataQueryImpl(const TSession& session, const TQueryType& query,
176+
const TTxControl& txControl, TParamsType params,
177+
const TExecDataQuerySettings& settings, bool fromCache
178+
) {
179+
if (!txControl.Tx_.has_value() || !txControl.CommitTx_) {
180+
return ExecuteDataQueryInternal(session, query, txControl, params, settings, fromCache);
181+
}
182+
183+
auto onPrecommitCompleted = [this, session, query, txControl, params, settings, fromCache](const NThreading::TFuture<TStatus>& f) {
184+
TStatus status = f.GetValueSync();
185+
if (!status.IsSuccess()) {
186+
return NThreading::MakeFuture(TDataQueryResult(std::move(status),
187+
{},
188+
txControl.Tx_,
189+
std::nullopt,
190+
false,
191+
std::nullopt));
192+
}
193+
194+
return ExecuteDataQueryInternal(session, query, txControl, params, settings, fromCache);
195+
};
196+
197+
return txControl.Tx_->Precommit().Apply(onPrecommitCompleted);
198+
}
199+
174200
template <typename TQueryType, typename TParamsType>
175201
TAsyncDataQueryResult ExecuteDataQueryInternal(const TSession& session, const TQueryType& query,
176202
const TTxControl& txControl, TParamsType params,
@@ -180,8 +206,8 @@ class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public
180206
request.set_session_id(TStringType{session.GetId()});
181207
auto txControlProto = request.mutable_tx_control();
182208
txControlProto->set_commit_tx(txControl.CommitTx_);
183-
if (txControl.TxId_) {
184-
txControlProto->set_tx_id(TStringType{txControl.TxId_.value()});
209+
if (txControl.Tx_.has_value()) {
210+
txControlProto->set_tx_id(TStringType{txControl.Tx_->GetId()});
185211
} else {
186212
SetTxSettings(txControl.BeginTx_, txControlProto->mutable_begin_tx());
187213
}

src/client/table/impl/transaction.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ TTransaction::TImpl::TImpl(const TSession& session, const std::string& txId)
99
{
1010
}
1111

12-
TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSettings& settings)
12+
TAsyncStatus TTransaction::TImpl::Precommit() const
1313
{
14-
ChangesAreAccepted = false;
15-
1614
auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {}));
1715

1816
for (auto& callback : PrecommitCallbacks) {
@@ -27,6 +25,15 @@ TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSetting
2725
result = result.Apply(action);
2826
}
2927

28+
return result;
29+
}
30+
31+
TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSettings& settings)
32+
{
33+
ChangesAreAccepted = false;
34+
35+
auto result = Precommit();
36+
3037
auto precommitsCompleted = [this, settings](const TAsyncStatus& result) mutable {
3138
if (const TStatus& status = result.GetValue(); !status.IsSuccess()) {
3239
return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), std::nullopt));

src/client/table/impl/transaction.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class TTransaction::TImpl {
1616
return !TxId_.empty();
1717
}
1818

19+
TAsyncStatus Precommit() const;
1920
TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
2021
TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());
2122

src/client/table/table.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1989,7 +1989,7 @@ const std::string& TSession::GetId() const {
19891989
////////////////////////////////////////////////////////////////////////////////
19901990

19911991
TTxControl::TTxControl(const TTransaction& tx)
1992-
: TxId_(tx.GetId())
1992+
: Tx_(tx)
19931993
{}
19941994

19951995
TTxControl::TTxControl(const TTxSettings& begin)
@@ -2012,6 +2012,11 @@ bool TTransaction::IsActive() const
20122012
return TransactionImpl_->IsActive();
20132013
}
20142014

2015+
TAsyncStatus TTransaction::Precommit() const
2016+
{
2017+
return TransactionImpl_->Precommit();
2018+
}
2019+
20152020
TAsyncCommitTransactionResult TTransaction::Commit(const TCommitTxSettings& settings) {
20162021
return TransactionImpl_->Commit(settings);
20172022
}

src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ class TFixture : public NUnitTest::TBaseFixture {
172172
void CheckTabletKeys(const TString& topicName);
173173
void DumpPQTabletKeys(const TString& topicName);
174174

175+
NTable::TDataQueryResult ExecuteDataQuery(NTable::TSession session, const TString& query, const NTable::TTxControl& control);
176+
175177
private:
176178
template<class E>
177179
E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx);
@@ -1535,6 +1537,13 @@ void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestD
15351537
}
15361538
}
15371539

1540+
NTable::TDataQueryResult TFixture::ExecuteDataQuery(NTable::TSession session, const TString& query, const NTable::TTxControl& control)
1541+
{
1542+
auto status = session.ExecuteDataQuery(query, control).GetValueSync();
1543+
UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToString());
1544+
return status;
1545+
}
1546+
15381547
Y_UNIT_TEST_F(WriteToTopic_Demo_11, TFixture)
15391548
{
15401549
for (auto endOfTransaction : {Commit, Rollback, CloseTableSession}) {
@@ -2148,6 +2157,69 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_41, TFixture)
21482157
CommitTx(tx, EStatus::SESSION_EXPIRED);
21492158
}
21502159

2160+
Y_UNIT_TEST_F(WriteToTopic_Demo_42, TFixture)
2161+
{
2162+
CreateTopic("topic_A", TEST_CONSUMER);
2163+
2164+
NTable::TSession tableSession = CreateTableSession();
2165+
NTable::TTransaction tx = BeginTx(tableSession);
2166+
2167+
for (size_t k = 0; k < 100; ++k) {
2168+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx);
2169+
}
2170+
2171+
CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID); // gracefully close
2172+
2173+
CommitTx(tx, EStatus::SUCCESS);
2174+
2175+
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
2176+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 100);
2177+
}
2178+
2179+
Y_UNIT_TEST_F(WriteToTopic_Demo_43, TFixture)
2180+
{
2181+
// The recording stream will run into a quota. Before the commit, the client will receive confirmations
2182+
// for some of the messages. The `ExecuteDataQuery` call will wait for the rest.
2183+
CreateTopic("topic_A", TEST_CONSUMER);
2184+
2185+
NTable::TSession tableSession = CreateTableSession();
2186+
NTable::TTransaction tx = BeginTx(tableSession);
2187+
2188+
for (size_t k = 0; k < 100; ++k) {
2189+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx);
2190+
}
2191+
2192+
ExecuteDataQuery(tableSession, "SELECT 1", NTable::TTxControl::Tx(tx).CommitTx(true));
2193+
2194+
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60));
2195+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 100);
2196+
}
2197+
2198+
Y_UNIT_TEST_F(WriteToTopic_Demo_44, TFixture)
2199+
{
2200+
CreateTopic("topic_A", TEST_CONSUMER);
2201+
2202+
NTable::TSession tableSession = CreateTableSession();
2203+
2204+
auto result = ExecuteDataQuery(tableSession, "SELECT 1", NTable::TTxControl::BeginTx());
2205+
2206+
NTable::TTransaction tx = *result.GetTransaction();
2207+
2208+
for (size_t k = 0; k < 100; ++k) {
2209+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(1'000'000, 'a'), &tx);
2210+
}
2211+
2212+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
2213+
2214+
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60));
2215+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 0);
2216+
2217+
ExecuteDataQuery(tableSession, "SELECT 2", NTable::TTxControl::Tx(tx).CommitTx(true));
2218+
2219+
messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(60));
2220+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 100);
2221+
}
2222+
21512223
}
21522224

21532225
}

0 commit comments

Comments
 (0)