Skip to content

YDB C++ SDK Import 11 #515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,4 @@ jobs:
- name: Test
shell: bash
run: |
ctest -j$(nproc) --preset integration
YDB_VERSION=${{ matrix.ydb-version }} ctest -j$(nproc) --preset integration
8 changes: 4 additions & 4 deletions examples/basic_example/basic_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ void MultiStep(TQueryClient client) {
}

// Get the active transaction id
auto txId = resultValue.GetTransaction()->GetId();
auto tx = *resultValue.GetTransaction();

// Processing the request result
TResultSetParser parser(resultValue.GetResultSet(0));
parser.TryNextRow();
Expand Down Expand Up @@ -328,7 +328,7 @@ void MultiStep(TQueryClient client) {
// and commit it at the end of the second query execution.
auto result2 = session.ExecuteQuery(
query2,
TTxControl::Tx(txId).CommitTx(),
TTxControl::Tx(tx).CommitTx(),
params2).GetValueSync();

if (!result2.IsSuccess()) {
Expand Down Expand Up @@ -381,7 +381,7 @@ void ExplicitTcl(TQueryClient client) {
// Execute query.
// Transaction control settings continues active transaction (tx)
auto updateResult = session.ExecuteQuery(query,
TTxControl::Tx(tx.GetId()),
TTxControl::Tx(tx),
params).GetValueSync();

if (!updateResult.IsSuccess()) {
Expand Down
36 changes: 16 additions & 20 deletions examples/topic_reader/transaction/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ TApplication::TApplication(const TOptions& options)

Driver.emplace(config);
TopicClient.emplace(*Driver);
TableClient.emplace(*Driver);
QueryClient.emplace(*Driver);

CreateTopicReadSession(options);
CreateTableSession();
CreateQuerySession();

TablePath = options.TablePath;
}
Expand All @@ -40,15 +40,15 @@ void TApplication::CreateTopicReadSession(const TOptions& options)
std::cout << "Topic session was created" << std::endl;
}

void TApplication::CreateTableSession()
void TApplication::CreateQuerySession()
{
NYdb::NTable::TCreateSessionSettings settings;
NYdb::NQuery::TCreateSessionSettings settings;

auto result = TableClient->GetSession(settings).GetValueSync();
auto result = QueryClient->GetSession(settings).GetValueSync();

TableSession = result.GetSession();
QuerySession = result.GetSession();

std::cout << "Table session was created" << std::endl;
std::cout << "Query session was created" << std::endl;
}

void TApplication::Run()
Expand Down Expand Up @@ -104,10 +104,10 @@ void TApplication::Finalize()
void TApplication::BeginTransaction()
{
Y_ABORT_UNLESS(!Transaction);
Y_ABORT_UNLESS(TableSession);
Y_ABORT_UNLESS(QuerySession);

auto settings = NYdb::NTable::TTxSettings::SerializableRW();
auto result = TableSession->BeginTransaction(settings).GetValueSync();
auto settings = NYdb::NQuery::TTxSettings::SerializableRW();
auto result = QuerySession->BeginTransaction(settings).GetValueSync();

Transaction = result.GetTransaction();
}
Expand All @@ -116,7 +116,7 @@ void TApplication::CommitTransaction()
{
Y_ABORT_UNLESS(Transaction);

NYdb::NTable::TCommitTxSettings settings;
NYdb::NQuery::TCommitTxSettings settings;

auto result = Transaction->Commit(settings).GetValueSync();

Expand Down Expand Up @@ -173,20 +173,16 @@ void TApplication::InsertRowsIntoTable()

auto params = builder.Build();

NYdb::NTable::TExecDataQuerySettings settings;
settings.KeepInQueryCache(true);

auto runQuery = [this, &query, &params, &settings](NYdb::NTable::TSession) -> NYdb::TStatus {
auto runQuery = [this, &query, &params](NYdb::NQuery::TSession) -> NYdb::TStatus {
auto result =
Transaction->GetSession().ExecuteDataQuery(query,
NYdb::NTable::TTxControl::Tx(*Transaction),
params,
settings).GetValueSync();
Transaction->GetSession().ExecuteQuery(query,
NYdb::NQuery::TTxControl::Tx(*Transaction),
params).GetValueSync();

return result;
};

TableClient->RetryOperationSync(runQuery);
QueryClient->RetryQuerySync(runQuery);
}

void TApplication::AppendTableRow(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message)
Expand Down
10 changes: 5 additions & 5 deletions examples/topic_reader/transaction/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#include <ydb-cpp-sdk/client/driver/driver.h>
#include <ydb-cpp-sdk/client/topic/client.h>
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/query/client.h>

#include <memory>
#include <optional>
Expand All @@ -28,7 +28,7 @@ class TApplication {
};

void CreateTopicReadSession(const TOptions& options);
void CreateTableSession();
void CreateQuerySession();

void BeginTransaction();
void CommitTransaction();
Expand All @@ -40,10 +40,10 @@ class TApplication {

std::optional<NYdb::TDriver> Driver;
std::optional<NYdb::NTopic::TTopicClient> TopicClient;
std::optional<NYdb::NTable::TTableClient> TableClient;
std::optional<NYdb::NQuery::TQueryClient> QueryClient;
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
std::optional<NYdb::NTable::TSession> TableSession;
std::optional<NYdb::NTable::TTransaction> Transaction;
std::optional<NYdb::NQuery::TSession> QuerySession;
std::optional<NYdb::NQuery::TTransaction> Transaction;
std::vector<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent> PendingStopEvents;
std::vector<TRow> Rows;
std::string TablePath;
Expand Down
33 changes: 17 additions & 16 deletions examples/topic_writer/transaction/main.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#include <ydb-cpp-sdk/client/topic/client.h>
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/query/client.h>

int main()
{
int main() {
const std::string ENDPOINT = "HOST:PORT";
const std::string DATABASE = "DATABASE";
const std::string TOPIC = "PATH/TO/TOPIC";
Expand All @@ -12,24 +11,26 @@ int main()
config.SetDatabase(DATABASE);
NYdb::TDriver driver(config);

NYdb::NTable::TTableClient tableClient(driver);
auto getTableSessionResult = tableClient.GetSession().GetValueSync();
ThrowOnError(getTableSessionResult);
auto tableSession = getTableSessionResult.GetSession();
NYdb::NQuery::TQueryClient queryClient(driver);
auto getSessionResult = queryClient.GetSession().GetValueSync();
NYdb::NStatusHelpers::ThrowOnError(getSessionResult);
auto session = getSessionResult.GetSession();

NYdb::NTopic::TTopicClient topicClient(driver);
auto topicSessionSettings = NYdb::NTopic::TWriteSessionSettings()
.Path(TOPIC)
.DeduplicationEnabled(true);
auto topicSession = topicClient.CreateSimpleBlockingWriteSession(topicSessionSettings);

auto beginTransactionResult = tableSession.BeginTransaction().GetValueSync();
ThrowOnError(beginTransactionResult);
auto transaction = beginTransactionResult.GetTransaction();
auto topicSession = topicClient.CreateSimpleBlockingWriteSession(
NYdb::NTopic::TWriteSessionSettings()
.Path(TOPIC)
.DeduplicationEnabled(true)
);

auto beginTxResult = session.BeginTransaction(NYdb::NQuery::TTxSettings()).GetValueSync();
NYdb::NStatusHelpers::ThrowOnError(beginTxResult);
auto tx = beginTxResult.GetTransaction();

NYdb::NTopic::TWriteMessage writeMessage("message");

topicSession->Write(std::move(writeMessage), &transaction);
topicSession->Write(std::move(writeMessage), &tx);

transaction.Commit().GetValueSync();
tx.Commit().GetValueSync();
}
1 change: 1 addition & 0 deletions include/ydb-cpp-sdk/client/driver/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TDriverConfig {
//! caCerts - The buffer containing the PEM encoded root certificates for SSL/TLS connections.
//! If this parameter is empty, the default roots will be used.
TDriverConfig& UseSecureConnection(const std::string& caCerts = std::string());
TDriverConfig& SetUsePerChannelTcpConnection(bool usePerChannel);
TDriverConfig& UseClientCertificate(const std::string& clientCert, const std::string& clientPrivateKey);
//! Set token, this option can be overridden for client by ClientSettings
TDriverConfig& SetAuthToken(const std::string& token);
Expand Down
17 changes: 17 additions & 0 deletions include/ydb-cpp-sdk/client/export/export.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ struct TExportToS3Settings : public TOperationRequestSettings<TExportToS3Setting
UNKNOWN = std::numeric_limits<int>::max(),
};

struct TEncryptionAlgorithm {
static const std::string AES_128_GCM;
static const std::string AES_256_GCM;
static const std::string CHACHA_20_POLY_1305;
};

struct TItem {
std::string Src;
std::string Dst;
Expand All @@ -89,6 +95,17 @@ struct TExportToS3Settings : public TOperationRequestSettings<TExportToS3Setting
FLUENT_SETTING_OPTIONAL(std::string, Description);
FLUENT_SETTING_OPTIONAL(uint32_t, NumberOfRetries);
FLUENT_SETTING_OPTIONAL(std::string, Compression);
FLUENT_SETTING_OPTIONAL(std::string, SourcePath);
FLUENT_SETTING_OPTIONAL(std::string, DestinationPrefix);

TSelf& SymmetricEncryption(const std::string& algorithm, const std::string& key) {
EncryptionAlgorithm_ = algorithm;
SymmetricKey_ = key;
return *this;
}

std::string EncryptionAlgorithm_;
std::string SymmetricKey_;
};

class TExportToS3Response : public TOperation {
Expand Down
16 changes: 8 additions & 8 deletions include/ydb-cpp-sdk/client/federated_topic/federated_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,20 +564,20 @@ void TPrintable<TFederatedPartitionSession>::DebugString(TStringBuilder& res, bo
template<>
void TPrintable<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent>::DebugString(TStringBuilder& res, bool) const;
template<>
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>>::DebugString(TStringBuilder& res, bool) const;
void TPrintable<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>::DebugString(TStringBuilder& res, bool) const;
template<>
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>>::DebugString(TStringBuilder& res, bool) const;
void TPrintable<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>::DebugString(TStringBuilder& res, bool) const;
template<>
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>>::DebugString(TStringBuilder& res, bool) const;
void TPrintable<NFederatedTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>::DebugString(TStringBuilder& res, bool) const;
template<>
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TStartPartitionSessionEvent>>::DebugString(TStringBuilder& res, bool) const;
void TPrintable<NFederatedTopic::TReadSessionEvent::TStartPartitionSessionEvent>::DebugString(TStringBuilder& res, bool) const;
template<>
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TStopPartitionSessionEvent>>::DebugString(TStringBuilder& res, bool) const;
void TPrintable<NFederatedTopic::TReadSessionEvent::TStopPartitionSessionEvent>::DebugString(TStringBuilder& res, bool) const;
template<>
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TEndPartitionSessionEvent>>::DebugString(TStringBuilder& res, bool) const;
void TPrintable<NFederatedTopic::TReadSessionEvent::TEndPartitionSessionEvent>::DebugString(TStringBuilder& res, bool) const;
template<>
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TPartitionSessionStatusEvent>>::DebugString(TStringBuilder& res, bool) const;
void TPrintable<NFederatedTopic::TReadSessionEvent::TPartitionSessionStatusEvent>::DebugString(TStringBuilder& res, bool) const;
template<>
void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TPartitionSessionClosedEvent>>::DebugString(TStringBuilder& res, bool) const;
void TPrintable<NFederatedTopic::TReadSessionEvent::TPartitionSessionClosedEvent>::DebugString(TStringBuilder& res, bool) const;

}
69 changes: 63 additions & 6 deletions include/ydb-cpp-sdk/client/import/import.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

#include <ydb-cpp-sdk/client/types/s3_settings.h>

namespace Ydb::Import {
class ListObjectsInS3ExportResult;
}

namespace NYdb::inline V3 {
namespace NImport {

Expand Down Expand Up @@ -35,15 +39,28 @@ struct TImportFromS3Settings : public TOperationRequestSettings<TImportFromS3Set
using TSelf = TImportFromS3Settings;

struct TItem {
// Source prefix.
// S3 prefix for item
std::string Src;

// Destination path.
// database path where to import data
std::string Dst;

// Source path.
// if the export contains the database objects list, you may specify the database object name,
// and the S3 prefix will be looked up in the database objects list by the import procedure
std::string SrcPath = {};
};

FLUENT_SETTING_VECTOR(TItem, Item);
FLUENT_SETTING_OPTIONAL(std::string, Description);
FLUENT_SETTING_OPTIONAL(uint32_t, NumberOfRetries);
FLUENT_SETTING_OPTIONAL(bool, NoACL);
FLUENT_SETTING_OPTIONAL(bool, SkipChecksumValidation);
FLUENT_SETTING_OPTIONAL(std::string, SourcePrefix);
FLUENT_SETTING_OPTIONAL(std::string, DestinationPath);
FLUENT_SETTING_OPTIONAL(std::string, SymmetricKey);
};

class TImportFromS3Response : public TOperation {
Expand All @@ -64,6 +81,49 @@ class TImportFromS3Response : public TOperation {
TMetadata Metadata_;
};

using TAsyncImportFromS3Response = NThreading::TFuture<TImportFromS3Response>;

struct TListObjectsInS3ExportSettings : public TOperationRequestSettings<TListObjectsInS3ExportSettings>,
public TS3Settings<TListObjectsInS3ExportSettings> {
using TSelf = TListObjectsInS3ExportSettings;

struct TItem {
// Database object path.
std::string Path = {};
};

FLUENT_SETTING_VECTOR(TItem, Item);
FLUENT_SETTING_OPTIONAL(uint32_t, NumberOfRetries);
FLUENT_SETTING_OPTIONAL(std::string, Prefix);
FLUENT_SETTING_OPTIONAL(std::string, SymmetricKey);
};

class TListObjectsInS3ExportResult : public TStatus {
public:
struct TItem {
// S3 object prefix
std::string Prefix;

// Database object path
std::string Path;

void Out(IOutputStream& out) const;
};

TListObjectsInS3ExportResult(TStatus&& status, const ::Ydb::Import::ListObjectsInS3ExportResult& proto);

const std::vector<TItem>& GetItems() const;
const std::string& NextPageToken() const { return NextPageToken_; }

void Out(IOutputStream& out) const;

private:
std::vector<TItem> Items_;
std::string NextPageToken_;
};

using TAsyncListObjectsInS3ExportResult = NThreading::TFuture<TListObjectsInS3ExportResult>;

/// Data
struct TImportYdbDumpDataSettings : public TOperationRequestSettings<TImportYdbDumpDataSettings> {
using TSelf = TImportYdbDumpDataSettings;
Expand All @@ -86,7 +146,9 @@ class TImportClient {
public:
TImportClient(const TDriver& driver, const TCommonClientSettings& settings = TCommonClientSettings());

NThreading::TFuture<TImportFromS3Response> ImportFromS3(const TImportFromS3Settings& settings);
TAsyncImportFromS3Response ImportFromS3(const TImportFromS3Settings& settings);

TAsyncListObjectsInS3ExportResult ListObjectsInS3Export(const TListObjectsInS3ExportSettings& settings, std::int64_t pageSize = 0, const std::string& pageToken = {});

// ydb dump format
TAsyncImportDataResult ImportData(const std::string& table, std::string&& data, const TImportYdbDumpDataSettings& settings);
Expand All @@ -98,8 +160,3 @@ class TImportClient {

} // namespace NImport
} // namespace NYdb

template<>
inline void Out<NYdb::NImport::TImportFromS3Response>(IOutputStream& o, const NYdb::NImport::TImportFromS3Response& x) {
return x.Out(o);
}
Loading
Loading