Skip to content

Bulk upsert integration test #280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/integration/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
add_subdirectory(basic_example_it)
add_subdirectory(bulk_upsert_simple_it)
13 changes: 13 additions & 0 deletions tests/integration/bulk_upsert_simple_it/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
add_ydb_test(NAME bulk_upsert_simple_it
SOURCES
main.cpp
bulk_upsert.cpp
bulk_upsert.h
LINK_LIBRARIES
yutil
YDB-CPP-SDK::Table
library-getopt
GTest::gtest_main
LABELS
integration
)
163 changes: 163 additions & 0 deletions tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#include "bulk_upsert.h"

#include <src/library/getopt/last_getopt.h>

#include <filesystem>

static constexpr size_t BATCH_SIZE = 1000;

static void ThrowOnError(const TStatus& status) {
if (!status.IsSuccess()) {
throw TYdbErrorException(status) << status;
}
}

static std::string JoinPath(const std::string& basePath, const std::string& path) {
if (basePath.empty()) {
return path;
}

std::filesystem::path prefixPathSplit(basePath);
prefixPathSplit /= path;

return prefixPathSplit;
}

TRunArgs GetRunArgs() {

std::string database = std::getenv("YDB_DATABASE");
std::string endpoint = std::getenv("YDB_ENDPOINT");

auto driverConfig = TDriverConfig()
.SetEndpoint(endpoint)
.SetDatabase(database)
.SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : "");

TDriver driver(driverConfig);
return {driver, JoinPath(database, "bulk")};
}

TStatus CreateTable(TTableClient& client, const std::string& table) {
std::cerr << "Create table " << table << "\n";

TRetryOperationSettings settings;
auto status = client.RetryOperationSync([&table](TSession session) {
auto tableDesc = TTableBuilder()
.AddNonNullableColumn("pk", EPrimitiveType::Uint64)
.AddNullableColumn("App", EPrimitiveType::Utf8)
.AddNullableColumn("Timestamp", EPrimitiveType::Timestamp)
.AddNullableColumn("Host", EPrimitiveType::Utf8)
.AddNullableColumn("HttpCode", EPrimitiveType::Uint32)
.AddNullableColumn("Message", EPrimitiveType::Utf8)
.SetPrimaryKeyColumns({"pk"})
.Build();

return session.CreateTable(table, std::move(tableDesc)).GetValueSync();
}, settings);

return status;
}

TStatistic GetLogBatch(uint64_t logOffset, std::vector<TLogMessage>& logBatch, uint32_t lastNumber) {
logBatch.clear();
uint32_t correctSumApp = 0;
uint32_t correctSumHost = 0;
uint32_t correctRowCount = 0;

for (size_t i = 0; i < BATCH_SIZE; ++i) {
TLogMessage message;
message.pk = correctRowCount + lastNumber;
message.App = "App_" + std::to_string(logOffset % 10);
message.Host = "192.168.0." + std::to_string(logOffset % 11);
message.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000);
message.HttpCode = 200;
message.Message = i % 2 ? "GET / HTTP/1.1" : "GET /images/logo.png HTTP/1.1";
logBatch.emplace_back(message);

correctSumApp += logOffset % 10;
correctSumHost += logOffset % 11;
++correctRowCount;

}
return {correctSumApp, correctSumHost, correctRowCount};
}

TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector<TLogMessage>& logBatch,
const TRetryOperationSettings& retrySettings) {
TValueBuilder rows;
rows.BeginList();
for (const auto& message : logBatch) {
rows.AddListItem()
.BeginStruct()
.AddMember("pk").Uint64(message.pk)
.AddMember("App").Utf8(message.App)
.AddMember("Host").Utf8(message.Host)
.AddMember("Timestamp").Timestamp(message.Timestamp)
.AddMember("HttpCode").Uint32(message.HttpCode)
.AddMember("Message").Utf8(message.Message)
.EndStruct();
}
rows.EndList();
auto bulkUpsertOperation = [table, rowsValue = rows.Build()](TTableClient& tableClient) {
TValue r = rowsValue;
auto status = tableClient.BulkUpsert(table, std::move(r));
return status.GetValueSync();
};

auto status = tableClient.RetryOperationSync(bulkUpsertOperation, retrySettings);
return status;
}

static TStatus SelectTransaction(TSession session, const std::string& path,
std::optional<TResultSet>& resultSet) {
std::filesystem::path filesystemPath(path);
auto query = std::format(R"(
PRAGMA TablePathPrefix("{}");

SELECT
SUM(CAST(SUBSTRING(CAST(App as string), 4) as Int32)),
SUM(CAST(SUBSTRING(CAST(Host as string), 10) as Int32)),
COUNT(*)
FROM {}
)", filesystemPath.parent_path().string(), filesystemPath.filename().string());

auto txControl =
TTxControl::BeginTx(TTxSettings::SerializableRW())
.CommitTx();

auto result = session.ExecuteDataQuery(query, txControl).GetValueSync();

if (result.IsSuccess()) {
resultSet = result.GetResultSet(0);
}

return result;
}

TStatistic Select(TTableClient& client, const std::string& path) {
std::optional<TResultSet> resultSet;
ThrowOnError(client.RetryOperationSync([path, &resultSet](TSession session) {
return SelectTransaction(session, path, resultSet);
}));

TResultSetParser parser(*resultSet);

uint64_t sumApp = 0;
uint64_t sumHost = 0;
uint64_t rowCount = 0;

if (parser.TryNextRow()) {

sumApp = *parser.ColumnParser("column0").GetOptionalInt64();
sumHost = *parser.ColumnParser("column1").GetOptionalInt64();
rowCount = parser.ColumnParser("column2").GetUint64();
}

return {sumApp, sumHost, rowCount};
}

void DropTable(TTableClient& client, const std::string& path) {
ThrowOnError(client.RetryOperationSync([path](TSession session) {
return session.DropTable(path).ExtractValueSync();
}));
}
43 changes: 43 additions & 0 deletions tests/integration/bulk_upsert_simple_it/bulk_upsert.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include <ydb-cpp-sdk/client/driver/driver.h>
#include <ydb-cpp-sdk/client/table/table.h>

using namespace NYdb;
using namespace NYdb::NTable;

struct TRunArgs {
TDriver driver;
std::string path;
};

struct TLogMessage {
uint64_t pk;
std::string App;
std::string Host;
TInstant Timestamp;
uint32_t HttpCode;
std::string Message;
};

class TYdbErrorException : public yexception {
public:
TYdbErrorException(const NYdb::TStatus& status)
: Status(status) {}

NYdb::TStatus Status;
};

struct TStatistic {
uint64_t sumApp;
uint64_t sumHost;
uint64_t rowCount;
};

TRunArgs GetRunArgs();
TStatus CreateTable(TTableClient& client, const std::string& table);
TStatistic GetLogBatch(uint64_t logOffset, std::vector<TLogMessage>& logBatch, uint32_t lastNumber);
TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector<TLogMessage>& logBatch,
const TRetryOperationSettings& retrySettings);
TStatistic Select(TTableClient& client, const std::string& path);
void DropTable(TTableClient& client, const std::string& path);
48 changes: 48 additions & 0 deletions tests/integration/bulk_upsert_simple_it/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include "bulk_upsert.h"

#include <gtest/gtest.h>

TEST(Integration, BulkUpsert) {

uint32_t correctSumApp = 0;
uint32_t correctSumHost = 0;
uint32_t correctRowCount = 0;

auto [driver, path] = GetRunArgs();

TTableClient client(driver);
uint32_t count = 1000;
TStatus statusCreate = CreateTable(client, path);
if (!statusCreate.IsSuccess()) {
FAIL() << "Create table failed with status: " << statusCreate << std::endl;
}

TRetryOperationSettings writeRetrySettings;
writeRetrySettings
.Idempotent(true)
.MaxRetries(20);

std::vector<TLogMessage> logBatch;
for (uint32_t offset = 0; offset < count; ++offset) {

auto [batchSumApp, batchSumHost, batchRowCount] = GetLogBatch(offset, logBatch, correctRowCount);
correctSumApp += batchSumApp;
correctSumHost += batchSumHost;
correctRowCount += batchRowCount;

TStatus statusWrite = WriteLogBatch(client, path, logBatch, writeRetrySettings);
if (!statusWrite.IsSuccess()) {
FAIL() << "Write failed with status: " << statusWrite << std::endl;
}
}

auto [sumApp, sumHost, rowCount] = Select(client, path);

EXPECT_EQ(rowCount, correctRowCount);
EXPECT_EQ(sumApp, correctSumApp);
EXPECT_EQ(sumHost, correctSumHost);

DropTable(client, path);
driver.Stop(true);

}
Loading