Skip to content

Commit 0772aff

Browse files
authored
Implement safe compaction. (#16744)
1 parent ef19d60 commit 0772aff

File tree

3 files changed

+68
-3
lines changed

3 files changed

+68
-3
lines changed

Diff for: ydb/apps/etcd_proxy/readme.txt

+1
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ And other todo's:
1212
- Implement retries on "Transaction lock invslideted" error.
1313
- Implement compaction with control of a requested revision.
1414
- Implement the watches for ranges. (Now the watches work only with a single key or a prefix.)
15+
- Add unit tests for watches.

Diff for: ydb/apps/etcd_proxy/service/etcd_impl.cpp

+13-3
Original file line numberDiff line numberDiff line change
@@ -1067,13 +1067,23 @@ class TCompactRequest
10671067
}
10681068

10691069
void MakeQueryWithParams(std::ostream& sql, NYdb::TParamsBuilder& params) final {
1070-
sql << "delete from `content` where `modified` < " << AddParam("Revision", params, KeyRevision) << ';' << std::endl;
1070+
sql << "$Trash = select c.key as key, c.modified as modified from `content` as c inner join (" << std::endl;
1071+
sql << "select max_by((`key`, `modified`), `modified`) as pair from `content`" << std::endl;
1072+
sql << "where `modified` < " << AddParam("Revision", params, KeyRevision) << " and 0L = `version` group by `key`" << std::endl;
1073+
sql << ") as keys on keys.pair.0 = c.key where c.modified <= keys.pair.1;" << std::endl;
1074+
sql << "select count(*) from $Trash;" << std::endl;
1075+
sql << "delete from `content` on select * from $Trash;" << std::endl;
10711076
}
10721077

1073-
void ReplyWith(const NYdb::TResultSets&, const TActorContext& ctx) final {
1078+
void ReplyWith(const NYdb::TResultSets& results, const TActorContext& ctx) final {
1079+
auto parser = NYdb::TResultSetParser(results.front());
1080+
const auto erased = parser.TryNextRow() ? NYdb::TValueParser(parser.GetValue(0)).GetUint64() : 0ULL;
1081+
if (!erased)
1082+
TryToRollbackRevision();
1083+
10741084
etcdserverpb::CompactionResponse response;
10751085
response.mutable_header()->set_revision(Revision);
1076-
Dump(std::cout) << std::endl;
1086+
Dump(std::cout) << '=' << erased << std::endl;
10771087
return Reply(response, ctx);
10781088
}
10791089

Diff for: ydb/apps/etcd_proxy/service/ut/etcd_service_ut.cpp

+54
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,60 @@ Y_UNIT_TEST_SUITE(Etcd_KV) {
10201020
}
10211021
});
10221022
}
1023+
1024+
Y_UNIT_TEST(Compact) {
1025+
MakeSimpleTest([](const std::unique_ptr<etcdserverpb::KV::Stub> &etcd) {
1026+
Put("key0", "value0", etcd);
1027+
Put("key3", "value1", etcd);
1028+
Put("key2", "value2", etcd);
1029+
Put("key0", "value3", etcd);
1030+
Put("key1", "value4", etcd);
1031+
Delete("key2", etcd);
1032+
const auto revForCompact = Put("key3", "value5", etcd);
1033+
Delete("key1", etcd);
1034+
const auto revForRequest = Put("key3", "value6", etcd);
1035+
Delete("key0", etcd);
1036+
Delete("key3", etcd);
1037+
1038+
{
1039+
grpc::ClientContext readRangeCtx;
1040+
etcdserverpb::RangeRequest rangeRequest;
1041+
rangeRequest.set_key("key");
1042+
rangeRequest.set_range_end("kez");
1043+
rangeRequest.set_keys_only(true);
1044+
etcdserverpb::RangeResponse rangeResponse;
1045+
UNIT_ASSERT(etcd->Range(&readRangeCtx, rangeRequest, &rangeResponse).ok());
1046+
UNIT_ASSERT_VALUES_EQUAL(rangeResponse.count(), 0LL);
1047+
}
1048+
1049+
{
1050+
grpc::ClientContext compactCtx;
1051+
etcdserverpb::CompactionRequest compactionRequest;
1052+
compactionRequest.set_revision(revForCompact);
1053+
etcdserverpb::CompactionResponse compactionResponse;
1054+
UNIT_ASSERT(etcd->Compact(&compactCtx, compactionRequest, &compactionResponse).ok());
1055+
}
1056+
1057+
{
1058+
grpc::ClientContext readRangeCtx;
1059+
etcdserverpb::RangeRequest rangeRequest;
1060+
rangeRequest.set_key("key");
1061+
rangeRequest.set_range_end("kez");
1062+
rangeRequest.set_revision(revForRequest);
1063+
rangeRequest.set_sort_target(etcdserverpb::RangeRequest_SortTarget_VALUE);
1064+
rangeRequest.set_sort_order(etcdserverpb::RangeRequest_SortOrder_ASCEND);
1065+
etcdserverpb::RangeResponse rangeResponse;
1066+
UNIT_ASSERT(etcd->Range(&readRangeCtx, rangeRequest, &rangeResponse).ok());
1067+
UNIT_ASSERT_VALUES_EQUAL(rangeResponse.count(), 2LL);
1068+
UNIT_ASSERT_VALUES_EQUAL(rangeResponse.kvs().size(), 2U);
1069+
UNIT_ASSERT_VALUES_EQUAL(rangeResponse.kvs(0).key(), "key0");
1070+
UNIT_ASSERT_VALUES_EQUAL(rangeResponse.kvs(1).key(), "key3");
1071+
UNIT_ASSERT_VALUES_EQUAL(rangeResponse.kvs(0).value(), "value3");
1072+
UNIT_ASSERT_VALUES_EQUAL(rangeResponse.kvs(1).value(), "value6");
1073+
}
1074+
1075+
});
1076+
}
10231077
} // Y_UNIT_TEST_SUITE(Etcd_KV)
10241078

10251079
Y_UNIT_TEST_SUITE(Etcd_Lease) {

0 commit comments

Comments
 (0)