Skip to content

Commit 4119768

Browse files
committed
[C++ SDK] Supported topic-to-table transactions in Query Service (#15984)
1 parent f4bd193 commit 4119768

32 files changed

+2042
-733
lines changed

examples/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ add_subdirectory(pagination)
44
add_subdirectory(secondary_index)
55
add_subdirectory(secondary_index_builtin)
66
add_subdirectory(topic_reader)
7+
add_subdirectory(topic_writer/transaction)
78
add_subdirectory(ttl)
89
add_subdirectory(vector_index)

examples/basic_example/basic_example.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,8 @@ void MultiStep(TQueryClient client) {
286286
}
287287

288288
// Get the active transaction id
289-
auto txId = resultValue.GetTransaction()->GetId();
290-
289+
auto tx = *resultValue.GetTransaction();
290+
291291
// Processing the request result
292292
TResultSetParser parser(resultValue.GetResultSet(0));
293293
parser.TryNextRow();
@@ -328,7 +328,7 @@ void MultiStep(TQueryClient client) {
328328
// and commit it at the end of the second query execution.
329329
auto result2 = session.ExecuteQuery(
330330
query2,
331-
TTxControl::Tx(txId).CommitTx(),
331+
TTxControl::Tx(tx).CommitTx(),
332332
params2).GetValueSync();
333333

334334
if (!result2.IsSuccess()) {
@@ -381,7 +381,7 @@ void ExplicitTcl(TQueryClient client) {
381381
// Execute query.
382382
// Transaction control settings continues active transaction (tx)
383383
auto updateResult = session.ExecuteQuery(query,
384-
TTxControl::Tx(tx.GetId()),
384+
TTxControl::Tx(tx),
385385
params).GetValueSync();
386386

387387
if (!updateResult.IsSuccess()) {

examples/topic_reader/transaction/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ add_executable(read_from_topic_in_transaction)
33
target_link_libraries(read_from_topic_in_transaction PUBLIC
44
yutil
55
YDB-CPP-SDK::Topic
6+
YDB-CPP-SDK::Query
67
getopt
78
)
89

examples/topic_reader/transaction/application.cpp

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ TApplication::TApplication(const TOptions& options)
2020

2121
Driver.emplace(config);
2222
TopicClient.emplace(*Driver);
23-
TableClient.emplace(*Driver);
23+
QueryClient.emplace(*Driver);
2424

2525
CreateTopicReadSession(options);
26-
CreateTableSession();
26+
CreateQuerySession();
2727

2828
TablePath = options.TablePath;
2929
}
@@ -40,15 +40,15 @@ void TApplication::CreateTopicReadSession(const TOptions& options)
4040
std::cout << "Topic session was created" << std::endl;
4141
}
4242

43-
void TApplication::CreateTableSession()
43+
void TApplication::CreateQuerySession()
4444
{
45-
NYdb::NTable::TCreateSessionSettings settings;
45+
NYdb::NQuery::TCreateSessionSettings settings;
4646

47-
auto result = TableClient->GetSession(settings).GetValueSync();
47+
auto result = QueryClient->GetSession(settings).GetValueSync();
4848

49-
TableSession = result.GetSession();
49+
QuerySession = result.GetSession();
5050

51-
std::cout << "Table session was created" << std::endl;
51+
std::cout << "Query session was created" << std::endl;
5252
}
5353

5454
void TApplication::Run()
@@ -104,10 +104,10 @@ void TApplication::Finalize()
104104
void TApplication::BeginTransaction()
105105
{
106106
Y_ABORT_UNLESS(!Transaction);
107-
Y_ABORT_UNLESS(TableSession);
107+
Y_ABORT_UNLESS(QuerySession);
108108

109-
auto settings = NYdb::NTable::TTxSettings::SerializableRW();
110-
auto result = TableSession->BeginTransaction(settings).GetValueSync();
109+
auto settings = NYdb::NQuery::TTxSettings::SerializableRW();
110+
auto result = QuerySession->BeginTransaction(settings).GetValueSync();
111111

112112
Transaction = result.GetTransaction();
113113
}
@@ -116,7 +116,7 @@ void TApplication::CommitTransaction()
116116
{
117117
Y_ABORT_UNLESS(Transaction);
118118

119-
NYdb::NTable::TCommitTxSettings settings;
119+
NYdb::NQuery::TCommitTxSettings settings;
120120

121121
auto result = Transaction->Commit(settings).GetValueSync();
122122

@@ -173,20 +173,16 @@ void TApplication::InsertRowsIntoTable()
173173

174174
auto params = builder.Build();
175175

176-
NYdb::NTable::TExecDataQuerySettings settings;
177-
settings.KeepInQueryCache(true);
178-
179-
auto runQuery = [this, &query, &params, &settings](NYdb::NTable::TSession) -> NYdb::TStatus {
176+
auto runQuery = [this, &query, &params](NYdb::NQuery::TSession) -> NYdb::TStatus {
180177
auto result =
181-
Transaction->GetSession().ExecuteDataQuery(query,
182-
NYdb::NTable::TTxControl::Tx(*Transaction),
183-
params,
184-
settings).GetValueSync();
178+
Transaction->GetSession().ExecuteQuery(query,
179+
NYdb::NQuery::TTxControl::Tx(*Transaction),
180+
params).GetValueSync();
185181

186182
return result;
187183
};
188184

189-
TableClient->RetryOperationSync(runQuery);
185+
QueryClient->RetryQuerySync(runQuery);
190186
}
191187

192188
void TApplication::AppendTableRow(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message)

examples/topic_reader/transaction/application.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
#include <ydb-cpp-sdk/client/driver/driver.h>
66
#include <ydb-cpp-sdk/client/topic/client.h>
7-
#include <ydb-cpp-sdk/client/table/table.h>
7+
#include <ydb-cpp-sdk/client/query/client.h>
88

99
#include <memory>
1010
#include <optional>
@@ -28,7 +28,7 @@ class TApplication {
2828
};
2929

3030
void CreateTopicReadSession(const TOptions& options);
31-
void CreateTableSession();
31+
void CreateQuerySession();
3232

3333
void BeginTransaction();
3434
void CommitTransaction();
@@ -40,10 +40,10 @@ class TApplication {
4040

4141
std::optional<NYdb::TDriver> Driver;
4242
std::optional<NYdb::NTopic::TTopicClient> TopicClient;
43-
std::optional<NYdb::NTable::TTableClient> TableClient;
43+
std::optional<NYdb::NQuery::TQueryClient> QueryClient;
4444
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
45-
std::optional<NYdb::NTable::TSession> TableSession;
46-
std::optional<NYdb::NTable::TTransaction> Transaction;
45+
std::optional<NYdb::NQuery::TSession> QuerySession;
46+
std::optional<NYdb::NQuery::TTransaction> Transaction;
4747
std::vector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent> PendingStopEvents;
4848
std::vector<TRow> Rows;
4949
std::string TablePath;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
add_executable(topic_writer_transaction)
2+
3+
target_link_libraries(topic_writer_transaction PUBLIC
4+
yutil
5+
YDB-CPP-SDK::Topic
6+
YDB-CPP-SDK::Query
7+
)
8+
9+
target_sources(topic_writer_transaction PRIVATE
10+
main.cpp
11+
)
12+
13+
vcs_info(topic_writer_transaction)
14+
15+
if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64")
16+
target_link_libraries(topic_writer_transaction PUBLIC
17+
cpuid_check
18+
)
19+
endif()
20+
21+
if (CMAKE_SYSTEM_NAME STREQUAL "Linux")
22+
target_link_options(topic_writer_transaction PRIVATE
23+
-ldl
24+
-lrt
25+
-Wl,--no-as-needed
26+
-lpthread
27+
)
28+
elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin")
29+
target_link_options(topic_writer_transaction PRIVATE
30+
-Wl,-platform_version,macos,11.0,11.0
31+
-framework
32+
CoreFoundation
33+
)
34+
endif()
Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
#include <ydb-cpp-sdk/client/topic/client.h>
2-
#include <ydb-cpp-sdk/client/table/table.h>
2+
#include <ydb-cpp-sdk/client/query/client.h>
33

4-
int main()
5-
{
4+
int main() {
65
const std::string ENDPOINT = "HOST:PORT";
76
const std::string DATABASE = "DATABASE";
87
const std::string TOPIC = "PATH/TO/TOPIC";
@@ -12,24 +11,26 @@ int main()
1211
config.SetDatabase(DATABASE);
1312
NYdb::TDriver driver(config);
1413

15-
NYdb::NTable::TTableClient tableClient(driver);
16-
auto getTableSessionResult = tableClient.GetSession().GetValueSync();
17-
ThrowOnError(getTableSessionResult);
18-
auto tableSession = getTableSessionResult.GetSession();
14+
NYdb::NQuery::TQueryClient queryClient(driver);
15+
auto getSessionResult = queryClient.GetSession().GetValueSync();
16+
NYdb::NStatusHelpers::ThrowOnError(getSessionResult);
17+
auto session = getSessionResult.GetSession();
1918

2019
NYdb::NTopic::TTopicClient topicClient(driver);
21-
auto topicSessionSettings = NYdb::NTopic::TWriteSessionSettings()
22-
.Path(TOPIC)
23-
.DeduplicationEnabled(true);
24-
auto topicSession = topicClient.CreateSimpleBlockingWriteSession(topicSessionSettings);
2520

26-
auto beginTransactionResult = tableSession.BeginTransaction().GetValueSync();
27-
ThrowOnError(beginTransactionResult);
28-
auto transaction = beginTransactionResult.GetTransaction();
21+
auto topicSession = topicClient.CreateSimpleBlockingWriteSession(
22+
NYdb::NTopic::TWriteSessionSettings()
23+
.Path(TOPIC)
24+
.DeduplicationEnabled(true)
25+
);
26+
27+
auto beginTxResult = session.BeginTransaction(NYdb::NQuery::TTxSettings()).GetValueSync();
28+
NYdb::NStatusHelpers::ThrowOnError(beginTxResult);
29+
auto tx = beginTxResult.GetTransaction();
2930

3031
NYdb::NTopic::TWriteMessage writeMessage("message");
3132

32-
topicSession->Write(std::move(writeMessage), &transaction);
33+
topicSession->Write(std::move(writeMessage), &tx);
3334

34-
transaction.Commit().GetValueSync();
35+
tx.Commit().GetValueSync();
3536
}

include/ydb-cpp-sdk/client/query/client.h

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb-cpp-sdk/client/driver/driver.h>
99
#include <ydb-cpp-sdk/client/params/params.h>
1010
#include <ydb-cpp-sdk/client/retry/retry.h>
11+
#include <ydb-cpp-sdk/client/types/tx/tx.h>
1112
#include <ydb-cpp-sdk/client/types/request_settings.h>
1213

1314
namespace NYdb::inline V3 {
@@ -168,30 +169,75 @@ class TCreateSessionResult: public TStatus {
168169
TSession Session_;
169170
};
170171

171-
class TTransaction {
172+
class TTransaction : public TTransactionBase {
172173
friend class TQueryClient;
173174
friend class TExecuteQueryIterator::TReaderImpl;
175+
friend class TExecQueryImpl;
176+
174177
public:
175-
const std::string& GetId() const {
176-
return TxId_;
178+
bool IsActive() const;
179+
180+
TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
181+
TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());
182+
183+
TSession GetSession() const;
184+
185+
void AddPrecommitCallback(TPrecommitTransactionCallback cb) override;
186+
void AddOnFailureCallback(TOnFailureTransactionCallback cb) override;
187+
188+
private:
189+
TTransaction(const TSession& session, const std::string& txId);
190+
191+
TAsyncStatus Precommit() const;
192+
NThreading::TFuture<void> ProcessFailure() const;
193+
194+
class TImpl;
195+
196+
std::shared_ptr<TImpl> TransactionImpl_;
197+
};
198+
199+
class TTxControl {
200+
friend class TExecQueryImpl;
201+
friend class TExecQueryInternal;
202+
203+
public:
204+
using TSelf = TTxControl;
205+
206+
static TTxControl Tx(const TTransaction& tx) {
207+
return TTxControl(tx);
177208
}
178209

179-
bool IsActive() const {
180-
return !TxId_.empty();
210+
[[deprecated("This is bug-provoking API. Use TTxControl::Tx(TTransaction) instead. "
211+
"This constructor will be removed in upcomming release")]]
212+
static TTxControl Tx(const std::string& txId) {
213+
return TTxControl(txId);
181214
}
182215

183-
TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
184-
TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());
216+
static TTxControl BeginTx(const TTxSettings& settings = TTxSettings()) {
217+
return TTxControl(settings);
218+
}
185219

186-
TSession GetSession() const {
187-
return Session_;
220+
static TTxControl NoTx() {
221+
return TTxControl();
188222
}
189223

224+
FLUENT_SETTING_FLAG(CommitTx);
225+
226+
bool HasTx() const { return !std::holds_alternative<std::monostate>(Tx_); }
227+
190228
private:
191-
TTransaction(const TSession& session, const std::string& txId);
229+
TTxControl() {}
192230

193-
TSession Session_;
194-
std::string TxId_;
231+
TTxControl(const TTransaction& tx)
232+
: Tx_(tx) {}
233+
234+
TTxControl(const TTxSettings& txSettings)
235+
: Tx_(txSettings) {}
236+
237+
TTxControl(const std::string& txId)
238+
: Tx_(txId) {}
239+
240+
const std::variant<std::monostate, TTransaction, TTxSettings, std::string> Tx_;
195241
};
196242

197243
class TBeginTransactionResult : public TStatus {

include/ydb-cpp-sdk/client/query/fwd.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class TExecuteQueryPart;
2727
class TExecuteQueryIterator;
2828

2929
class TTransaction;
30-
struct TTxControl;
30+
class TTxControl;
3131

3232
class TQueryContent;
3333
class TResultSetMeta;

include/ydb-cpp-sdk/client/query/query.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ class TExecuteQueryIterator : public TStatus {
6464
: TStatus(std::move(status))
6565
, ReaderImpl_(impl) {}
6666

67+
TExecuteQueryIterator(
68+
std::shared_ptr<TReaderImpl> impl,
69+
TStatus&& status)
70+
: TStatus(std::move(status))
71+
, ReaderImpl_(impl) {}
72+
6773
std::shared_ptr<TReaderImpl> ReaderImpl_;
6874
};
6975

0 commit comments

Comments
 (0)