Skip to content

Sync sdks 9 #322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions examples/topic_writer/transaction/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include <ydb-cpp-sdk/client/topic/client.h>
#include <ydb-cpp-sdk/client/table/table.h>

void ThrowOnError(const NYdb::TStatus& status)
{
if (status.IsSuccess()) {
return;
}

ythrow yexception() << status;
}

int main()
{
const std::string ENDPOINT = "HOST:PORT";
const std::string DATABASE = "DATABASE";
const std::string TOPIC = "PATH/TO/TOPIC";

NYdb::TDriverConfig config;
config.SetEndpoint(ENDPOINT);
config.SetDatabase(DATABASE);
NYdb::TDriver driver(config);

NYdb::NTable::TTableClient tableClient(driver);
auto getTableSessionResult = tableClient.GetSession().GetValueSync();
ThrowOnError(getTableSessionResult);
auto tableSession = getTableSessionResult.GetSession();

NYdb::NTopic::TTopicClient topicClient(driver);
auto topicSessionSettings = NYdb::NTopic::TWriteSessionSettings()
.Path(TOPIC)
.DeduplicationEnabled(true);
auto topicSession = topicClient.CreateSimpleBlockingWriteSession(topicSessionSettings);

auto beginTransactionResult = tableSession.BeginTransaction().GetValueSync();
ThrowOnError(beginTransactionResult);
auto transaction = beginTransactionResult.GetTransaction();

NYdb::NTopic::TWriteMessage writeMessage("message");

topicSession->Write(std::move(writeMessage), &transaction);

transaction.Commit().GetValueSync();
}
21 changes: 9 additions & 12 deletions include/ydb-cpp-sdk/client/table/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,8 @@ struct TReadTableSettings : public TRequestSettings<TReadTableSettings> {
FLUENT_SETTING_OPTIONAL(bool, ReturnNotNullAsOptional);
};

using TPrecommitTransactionCallback = std::function<TAsyncStatus ()>;

//! Represents all session operations
//! Session is transparent logic representation of connection
class TSession {
Expand Down Expand Up @@ -1831,26 +1833,21 @@ TAsyncStatus TTableClient::RetryOperation(
class TTransaction {
friend class TTableClient;
public:
const std::string& GetId() const {
return TxId_;
}

bool IsActive() const {
return !TxId_.empty();
}
const std::string& GetId() const;
bool IsActive() const;

TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());

TSession GetSession() const {
return Session_;
}
TSession GetSession() const;
void AddPrecommitCallback(TPrecommitTransactionCallback cb);

private:
TTransaction(const TSession& session, const std::string& txId);

TSession Session_;
std::string TxId_;
class TImpl;

std::shared_ptr<TImpl> TransactionImpl_;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
16 changes: 11 additions & 5 deletions include/ydb-cpp-sdk/client/topic/write_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace NYdb::NTable {

namespace NYdb::NTopic {

using TTransaction = NTable::TTransaction;

//! Settings for write session.
struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> {
using TSelf = TWriteSessionSettings;
Expand Down Expand Up @@ -190,9 +192,9 @@ struct TWriteMessage {
FLUENT_SETTING(TMessageMeta, MessageMeta);

//! Transaction id
FLUENT_SETTING_OPTIONAL(std::reference_wrapper<NTable::TTransaction>, Tx);
FLUENT_SETTING_OPTIONAL(std::reference_wrapper<TTransaction>, Tx);

const NTable::TTransaction* GetTxPtr() const
TTransaction* GetTxPtr() const
{
return Tx_ ? &Tx_->get() : nullptr;
}
Expand All @@ -204,7 +206,9 @@ class ISimpleBlockingWriteSession : public TThrRefBase {
//! Write single message. Blocks for up to blockTimeout if inflight is full or memoryUsage is exceeded;
//! return - true if write succeeded, false if message was not enqueued for write within blockTimeout.
//! no Ack is provided.
virtual bool Write(TWriteMessage&& message, const TDuration& blockTimeout = TDuration::Max()) = 0;
virtual bool Write(TWriteMessage&& message,
NTable::TTransaction* tx = nullptr,
const TDuration& blockTimeout = TDuration::Max()) = 0;


//! Write single message. Deprecated method with only basic message options.
Expand Down Expand Up @@ -249,15 +253,17 @@ class IWriteSession {

//! Write single message.
//! continuationToken - a token earlier provided to client with ReadyToAccept event.
virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) = 0;
virtual void Write(TContinuationToken&& continuationToken, TWriteMessage&& message,
NTable::TTransaction* tx = nullptr) = 0;

//! Write single message. Old method with only basic message options.
virtual void Write(TContinuationToken&& continuationToken, std::string_view data, std::optional<uint64_t> seqNo = std::nullopt,
std::optional<TInstant> createTimestamp = std::nullopt) = 0;

//! Write single message that is already coded by codec.
//! continuationToken - a token earlier provided to client with ReadyToAccept event.
virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params) = 0;
virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params,
NTable::TTransaction* tx = nullptr) = 0;

//! Write single message that is already compressed by codec. Old method with only basic message options.
virtual void WriteEncoded(TContinuationToken&& continuationToken, std::string_view data, ECodec codec, uint32_t originalSize,
Expand Down
10 changes: 8 additions & 2 deletions src/client/federated_topic/impl/federated_write_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,16 @@ class TFederatedWriteSession : public NTopic::IWriteSession,
NThreading::TFuture<uint64_t> GetInitSeqNo() override {
return TryGetImpl()->GetInitSeqNo();
}
void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message) override {
void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message, NTable::TTransaction* tx = nullptr) override {
if (tx) {
ythrow yexception() << "transactions are not supported";
}
TryGetImpl()->Write(std::move(continuationToken), std::move(message));
}
void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params) override {
void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params, NTable::TTransaction* tx = nullptr) override {
if (tx) {
ythrow yexception() << "transactions are not supported";
}
TryGetImpl()->WriteEncoded(std::move(continuationToken), std::move(params));
}
void Write(NTopic::TContinuationToken&& continuationToken, std::string_view data, std::optional<uint64_t> seqNo = std::nullopt,
Expand Down
1 change: 1 addition & 0 deletions src/client/table/impl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ target_sources(client-ydb_table-impl PRIVATE
readers.cpp
request_migrator.cpp
table_client.cpp
transaction.cpp
)

target_compile_options(client-ydb_table-impl PRIVATE
Expand Down
1 change: 0 additions & 1 deletion src/client/table/impl/client_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ namespace NTable {

using TSessionInspectorFn = std::function<void(TAsyncCreateSessionResult future)>;


class TSession::TImpl : public TKqpSessionCommon {
friend class TTableClient;
friend class TSession;
Expand Down
8 changes: 4 additions & 4 deletions src/client/table/impl/table_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,12 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio
return promise.GetFuture();
}

TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const TTransaction& tx,
TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const std::string& txId,
const TCommitTxSettings& settings)
{
auto request = MakeOperationRequest<Ydb::Table::CommitTransactionRequest>(settings);
request.set_session_id(TStringType{session.GetId()});
request.set_tx_id(TStringType{tx.GetId()});
request.set_tx_id(TStringType{txId});
request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_));

auto promise = NewPromise<TCommitTransactionResult>();
Expand Down Expand Up @@ -684,12 +684,12 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess
return promise.GetFuture();
}

TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const TTransaction& tx,
TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const std::string& txId,
const TRollbackTxSettings& settings)
{
auto request = MakeOperationRequest<Ydb::Table::RollbackTransactionRequest>(settings);
request.set_session_id(TStringType{session.GetId()});
request.set_tx_id(TStringType{tx.GetId()});
request.set_tx_id(TStringType{txId});

return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>(
std::move(request),
Expand Down
4 changes: 2 additions & 2 deletions src/client/table/impl/table_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public

TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings,
const TBeginTxSettings& settings);
TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const TTransaction& tx,
TAsyncCommitTransactionResult CommitTransaction(const TSession& session, const std::string& txId,
const TCommitTxSettings& settings);
TAsyncStatus RollbackTransaction(const TSession& session, const TTransaction& tx,
TAsyncStatus RollbackTransaction(const TSession& session, const std::string& txId,
const TRollbackTxSettings& settings);

TAsyncExplainDataQueryResult ExplainDataQuery(const TSession& session, const std::string& query,
Expand Down
58 changes: 58 additions & 0 deletions src/client/table/impl/transaction.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#include "transaction.h"
#include "table_client.h"

namespace NYdb::NTable {

TTransaction::TImpl::TImpl(const TSession& session, const std::string& txId)
: Session_(session)
, TxId_(txId)
{
}

TAsyncCommitTransactionResult TTransaction::TImpl::Commit(const TCommitTxSettings& settings)
{
ChangesAreAccepted = false;

auto result = NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {}));

for (auto& callback : PrecommitCallbacks) {
auto action = [curr = callback()](const TAsyncStatus& prev) {
if (const TStatus& status = prev.GetValue(); !status.IsSuccess()) {
return prev;
}

return curr;
};

result = result.Apply(action);
}

auto precommitsCompleted = [this, settings](const TAsyncStatus& result) mutable {
if (const TStatus& status = result.GetValue(); !status.IsSuccess()) {
return NThreading::MakeFuture(TCommitTransactionResult(TStatus(status), std::nullopt));
}

return Session_.Client_->CommitTransaction(Session_,
TxId_,
settings);
};

return result.Apply(precommitsCompleted);
}

TAsyncStatus TTransaction::TImpl::Rollback(const TRollbackTxSettings& settings)
{
ChangesAreAccepted = false;
return Session_.Client_->RollbackTransaction(Session_, TxId_, settings);
}

void TTransaction::TImpl::AddPrecommitCallback(TPrecommitTransactionCallback cb)
{
if (!ChangesAreAccepted) {
ythrow TContractViolation("Changes are no longer accepted");
}

PrecommitCallbacks.push_back(std::move(cb));
}

}
36 changes: 36 additions & 0 deletions src/client/table/impl/transaction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <ydb-cpp-sdk/client/table/table.h>

namespace NYdb::NTable {

class TTransaction::TImpl {
public:
TImpl(const TSession& session, const std::string& txId);

const std::string& GetId() const {
return TxId_;
}

bool IsActive() const {
return !TxId_.empty();
}

TAsyncCommitTransactionResult Commit(const TCommitTxSettings& settings = TCommitTxSettings());
TAsyncStatus Rollback(const TRollbackTxSettings& settings = TRollbackTxSettings());

TSession GetSession() const {
return Session_;
}

void AddPrecommitCallback(TPrecommitTransactionCallback cb);

private:
TSession Session_;
std::string TxId_;

bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet
std::vector<TPrecommitTransactionCallback> PrecommitCallbacks;
};

}
28 changes: 24 additions & 4 deletions src/client/table/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <src/client/table/impl/data_query.h>
#include <src/client/table/impl/request_migrator.h>
#include <src/client/table/impl/table_client.h>
#include <src/client/table/impl/transaction.h>
#include <ydb-cpp-sdk/client/resources/ydb_resources.h>

#include <google/protobuf/util/time_util.h>
Expand Down Expand Up @@ -1998,16 +1999,35 @@ TTxControl::TTxControl(const TTxSettings& begin)
////////////////////////////////////////////////////////////////////////////////

TTransaction::TTransaction(const TSession& session, const std::string& txId)
: Session_(session)
, TxId_(txId)
: TransactionImpl_(new TTransaction::TImpl(session, txId))
{}

const std::string& TTransaction::GetId() const
{
return TransactionImpl_->GetId();
}

bool TTransaction::IsActive() const
{
return TransactionImpl_->IsActive();
}

TAsyncCommitTransactionResult TTransaction::Commit(const TCommitTxSettings& settings) {
return Session_.Client_->CommitTransaction(Session_, *this, settings);
return TransactionImpl_->Commit(settings);
}

TAsyncStatus TTransaction::Rollback(const TRollbackTxSettings& settings) {
return Session_.Client_->RollbackTransaction(Session_, *this, settings);
return TransactionImpl_->Rollback(settings);
}

TSession TTransaction::GetSession() const
{
return TransactionImpl_->GetSession();
}

void TTransaction::AddPrecommitCallback(TPrecommitTransactionCallback cb)
{
TransactionImpl_->AddPrecommitCallback(std::move(cb));
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions src/client/topic/impl/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ class TBaseSessionEventsQueue : public ISignalable {
while (!HasEventsImpl()) {
std::unique_lock<std::mutex> lk(Mutex, std::adopt_lock);
CondVar.wait(lk);
lk.release();
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/client/topic/impl/read_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,10 @@ bool TReadSession::Close(TDuration timeout) {
issues.AddIssue(TStringBuilder() << "Session was closed after waiting " << timeout);
EventsQueue->Close(TSessionClosedEvent(EStatus::TIMEOUT, std::move(issues)), deferred);
}

std::lock_guard guard(Lock);
Aborting = true; // Set abort flag for doing nothing on destructor.
{
std::lock_guard guard(Lock);
Aborting = true; // Set abort flag for doing nothing on destructor.
}
return result;
}

Expand Down
Loading
Loading