Skip to content

Commit 259585e

Browse files
bulk upsert simple test
1 parent 99108f7 commit 259585e

File tree

3 files changed

+156
-0
lines changed

3 files changed

+156
-0
lines changed

tests/integration/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
add_subdirectory(basic_example_it)
2+
add_subdirectory(bulk_upsert_simple_it)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
add_ydb_test(NAME bulk_upsert_simple_it
2+
SOURCES
3+
main.cpp
4+
LINK_LIBRARIES
5+
yutil
6+
YDB-CPP-SDK::Table
7+
GTest::gtest_main
8+
public-lib-json_value
9+
LABELS
10+
integration
11+
)
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
#include <ydb-cpp-sdk/client/table/table.h>
2+
#include <src/library/getopt/last_getopt.h>
3+
4+
#include <gtest/gtest.h>
5+
6+
#include <filesystem>
7+
8+
constexpr size_t BATCH_SIZE = 1000;
9+
10+
struct TLogMessage {
11+
std::string App;
12+
std::string Host;
13+
TInstant Timestamp;
14+
uint32_t HttpCode;
15+
std::string Message;
16+
};
17+
18+
void GetLogBatch(uint64_t logOffset, std::vector<TLogMessage>& logBatch) {
19+
logBatch.clear();
20+
for (size_t i = 0; i < BATCH_SIZE; ++i) {
21+
TLogMessage message;
22+
message.App = "App_" + ToString(logOffset % 10);
23+
message.Host = "192.168.0." + ToString(logOffset % 11);
24+
message.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000);
25+
message.HttpCode = 200;
26+
message.Message = i % 2 ? "GET / HTTP/1.1" : "GET /images/logo.png HTTP/1.1";
27+
logBatch.emplace_back(message);
28+
}
29+
}
30+
31+
bool WriteLogBatch(NYdb::NTable::TTableClient& tableClient, const std::string& table, const std::vector<TLogMessage>& logBatch,
32+
const NYdb::NTable::TRetryOperationSettings& retrySettings)
33+
{
34+
NYdb::TValueBuilder rows;
35+
rows.BeginList();
36+
for (const auto& message : logBatch) {
37+
rows.AddListItem()
38+
.BeginStruct()
39+
.AddMember("App").Utf8(message.App)
40+
.AddMember("Host").Utf8(message.Host)
41+
.AddMember("Timestamp").Timestamp(message.Timestamp)
42+
.AddMember("HttpCode").Uint32(message.HttpCode)
43+
.AddMember("Message").Utf8(message.Message)
44+
.EndStruct();
45+
}
46+
rows.EndList();
47+
48+
auto bulkUpsertOperation = [table, rowsValue = rows.Build()](NYdb::NTable::TTableClient& tableClient) {
49+
NYdb::TValue r = rowsValue;
50+
auto status = tableClient.BulkUpsert(table, std::move(r));
51+
return status.GetValueSync();
52+
};
53+
54+
auto status = tableClient.RetryOperationSync(bulkUpsertOperation, retrySettings);
55+
56+
if (!status.IsSuccess()) {
57+
std::cerr << std::endl << "Write failed with status: " << (const NYdb::TStatus&)status << std::endl;
58+
return false;
59+
}
60+
return true;
61+
}
62+
63+
bool CreateLogTable(NYdb::NTable::TTableClient& client, const std::string& table) {
64+
std::cerr << "Create table " << table << "\n";
65+
66+
NYdb::NTable::TRetryOperationSettings settings;
67+
auto status = client.RetryOperationSync([&table](NYdb::NTable::TSession session) {
68+
auto tableDesc = NYdb::NTable::TTableBuilder()
69+
.AddNullableColumn("App", NYdb::EPrimitiveType::Utf8)
70+
.AddNullableColumn("Timestamp", NYdb::EPrimitiveType::Timestamp)
71+
.AddNullableColumn("Host", NYdb::EPrimitiveType::Utf8)
72+
.AddNullableColumn("HttpCode", NYdb::EPrimitiveType::Uint32)
73+
.AddNullableColumn("Message", NYdb::EPrimitiveType::Utf8)
74+
.SetPrimaryKeyColumns({"App", "Timestamp", "Host"})
75+
.Build();
76+
77+
return session.CreateTable(table, std::move(tableDesc)).GetValueSync();
78+
}, settings);
79+
80+
if (!status.IsSuccess()) {
81+
std::cerr << "Create table failed with status: " << status << std::endl;
82+
return false;
83+
}
84+
return true;
85+
}
86+
87+
bool Run(const NYdb::TDriver &driver, const std::string &table, uint32_t batchCount) {
88+
NYdb::NTable::TTableClient client(driver);
89+
if (!CreateLogTable(client, table)) {
90+
return false;
91+
}
92+
93+
NYdb::NTable::TRetryOperationSettings writeRetrySettings;
94+
writeRetrySettings
95+
.Idempotent(true)
96+
.MaxRetries(20);
97+
98+
std::vector<TLogMessage> logBatch;
99+
for (uint32_t offset = 0; offset < batchCount; ++offset) {
100+
GetLogBatch(offset, logBatch);
101+
if (!WriteLogBatch(client, table, logBatch, writeRetrySettings)) {
102+
return false;
103+
}
104+
std::cerr << ".";
105+
}
106+
107+
std::cerr << std::endl << "Done." << std::endl;
108+
return true;
109+
}
110+
111+
std::string JoinPath(const std::string& basePath, const std::string& path) {
112+
if (basePath.empty()) {
113+
return path;
114+
}
115+
116+
std::filesystem::path prefixPathSplit(basePath);
117+
prefixPathSplit /= path;
118+
119+
return prefixPathSplit;
120+
}
121+
122+
TEST(Integration, BulkUpsert) {
123+
124+
std::string database = std::getenv("YDB_DATABASE");
125+
std::string endpoint = std::getenv("YDB_ENDPOINT");
126+
std::string table = "bulk_upsert_example";
127+
uint32_t count = 1000;
128+
129+
table = JoinPath(database, table);
130+
131+
auto driverConfig = NYdb::TDriverConfig()
132+
.SetEndpoint(endpoint)
133+
.SetDatabase(database)
134+
.SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : "");
135+
136+
NYdb::TDriver driver(driverConfig);
137+
138+
if (!::Run(driver, table, count)) {
139+
driver.Stop(true);
140+
FAIL();
141+
}
142+
143+
driver.Stop(true);
144+
}

0 commit comments

Comments
 (0)