Skip to content

Commit 3412195

Browse files
jaykoreanfacebook-github-bot
authored andcommitted
Introduce MultiCfIterator (facebook#12153)
Summary: This PR introduces a new implementation of `Iterator` via a new public API called `NewMultiCfIterator()`. The new API takes a vector of column family handles to build a cross-column-family iterator, which internally maintains multiple `DBIter`s as child iterators from a consistent database state. When a key exists in multiple column families, the iterator selects the value (and wide columns) from the first column family containing the key, following the order provided in the `column_families` parameter. Similar to the merging iterator, a min heap is used to iterate across the child iterators. Backward iteration and direction change functionalities will be implemented in future PRs. The comparator used to compare keys across different column families will be derived from the iterator of the first column family specified in `column_families`. This comparator will be checked against the comparators from all other column families that the iterator will traverse. If there's a mismatch with any of the comparators, the initialization of the iterator will fail. Please note that this PR is not enough for users to start using `MultiCfIterator`. The `MultiCfIterator` and related APIs are still marked as "**DO NOT USE - UNDER CONSTRUCTION**". This PR is just the first of many PRs that will follow soon. This PR includes the following: - Introduction and partial implementation of the `MultiCfIterator`, which implements the generic `Iterator` interface. The implementation includes the construction of the iterator, `SeekToFirst()`, `Next()`, `Valid()`, `key()`, `value()`, and `columns()`. - Unit tests to verify iteration across multiple column families in two distinct scenarios: (1) keys are unique across all column families, and (2) the same keys exist in multiple column families. Pull Request resolved: facebook#12153 Reviewed By: pdillinger Differential Revision: D52308697 Pulled By: jaykorean fbshipit-source-id: b03e69f13b40af5a8f0598d0f43a0bec01ef8294
1 parent 3fff57f commit 3412195

14 files changed

+623
-1
lines changed

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ else()
191191
endif()
192192
if(MINGW)
193193
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format")
194+
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wa,-mbig-obj")
194195
add_definitions(-D_POSIX_C_SOURCE=1)
195196
endif()
196197
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
@@ -691,6 +692,7 @@ set(SOURCES
691692
db/memtable_list.cc
692693
db/merge_helper.cc
693694
db/merge_operator.cc
695+
db/multi_cf_iterator.cc
694696
db/output_validator.cc
695697
db/periodic_task_scheduler.cc
696698
db/range_del_aggregator.cc
@@ -1343,6 +1345,7 @@ if(WITH_TESTS)
13431345
db/memtable_list_test.cc
13441346
db/merge_helper_test.cc
13451347
db/merge_test.cc
1348+
db/multi_cf_iterator_test.cc
13461349
db/options_file_test.cc
13471350
db/perf_context_test.cc
13481351
db/periodic_task_scheduler_test.cc

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1640,6 +1640,9 @@ wal_edit_test: $(OBJ_DIR)/db/wal_edit_test.o $(TEST_LIBRARY) $(LIBRARY)
16401640
dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY)
16411641
$(AM_LINK)
16421642

1643+
multi_cf_iterator_test: $(OBJ_DIR)/db/multi_cf_iterator_test.o $(TEST_LIBRARY) $(LIBRARY)
1644+
$(AM_LINK)
1645+
16431646
env_basic_test: $(OBJ_DIR)/env/env_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
16441647
$(AM_LINK)
16451648

TARGETS

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
8383
"db/memtable_list.cc",
8484
"db/merge_helper.cc",
8585
"db/merge_operator.cc",
86+
"db/multi_cf_iterator.cc",
8687
"db/output_validator.cc",
8788
"db/periodic_task_scheduler.cc",
8889
"db/range_del_aggregator.cc",
@@ -5226,6 +5227,12 @@ cpp_unittest_wrapper(name="mock_env_test",
52265227
extra_compiler_flags=[])
52275228

52285229

5230+
cpp_unittest_wrapper(name="multi_cf_iterator_test",
5231+
srcs=["db/multi_cf_iterator_test.cc"],
5232+
deps=[":rocksdb_test_lib"],
5233+
extra_compiler_flags=[])
5234+
5235+
52295236
cpp_unittest_wrapper(name="object_registry_test",
52305237
srcs=["utilities/object_registry_test.cc"],
52315238
deps=[":rocksdb_test_lib"],

db/db_impl/db_impl.cc

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
#include "db/memtable.h"
4545
#include "db/memtable_list.h"
4646
#include "db/merge_context.h"
47-
#include "db/merge_helper.h"
47+
#include "db/multi_cf_iterator.h"
4848
#include "db/periodic_task_scheduler.h"
4949
#include "db/range_tombstone_fragmenter.h"
5050
#include "db/table_cache.h"
@@ -3735,6 +3735,31 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
37353735
return db_iter;
37363736
}
37373737

3738+
std::unique_ptr<Iterator> DBImpl::NewMultiCfIterator(
3739+
const ReadOptions& _read_options,
3740+
const std::vector<ColumnFamilyHandle*>& column_families) {
3741+
if (column_families.size() == 0) {
3742+
return std::unique_ptr<Iterator>(NewErrorIterator(
3743+
Status::InvalidArgument("No Column Family was provided")));
3744+
}
3745+
const Comparator* first_comparator = column_families[0]->GetComparator();
3746+
for (size_t i = 1; i < column_families.size(); ++i) {
3747+
const Comparator* cf_comparator = column_families[i]->GetComparator();
3748+
if (first_comparator != cf_comparator &&
3749+
first_comparator->GetId().compare(cf_comparator->GetId()) != 0) {
3750+
return std::unique_ptr<Iterator>(NewErrorIterator(Status::InvalidArgument(
3751+
"Different comparators are being used across CFs")));
3752+
}
3753+
}
3754+
std::vector<Iterator*> child_iterators;
3755+
Status s = NewIterators(_read_options, column_families, &child_iterators);
3756+
if (s.ok()) {
3757+
return std::make_unique<MultiCfIterator>(first_comparator, column_families,
3758+
std::move(child_iterators));
3759+
}
3760+
return std::unique_ptr<Iterator>(NewErrorIterator(s));
3761+
}
3762+
37383763
Status DBImpl::NewIterators(
37393764
const ReadOptions& _read_options,
37403765
const std::vector<ColumnFamilyHandle*>& column_families,

db/db_impl/db_impl.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,13 @@ class DBImpl : public DB {
351351

352352
const Snapshot* GetSnapshot() override;
353353
void ReleaseSnapshot(const Snapshot* snapshot) override;
354+
355+
// UNDER CONSTRUCTION - DO NOT USE
356+
// Return a cross-column-family iterator from a consistent database state.
357+
std::unique_ptr<Iterator> NewMultiCfIterator(
358+
const ReadOptions& options,
359+
const std::vector<ColumnFamilyHandle*>& column_families) override;
360+
354361
// Create a timestamped snapshot. This snapshot can be shared by multiple
355362
// readers. If any of them uses it for write conflict checking, then
356363
// is_write_conflict_boundary is true. For simplicity, set it to true by

db/db_test.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3199,6 +3199,14 @@ class ModelDB : public DB {
31993199
std::vector<Iterator*>* /*iterators*/) override {
32003200
return Status::NotSupported("Not supported yet");
32013201
}
3202+
3203+
// UNDER CONSTRUCTION - DO NOT USE
3204+
std::unique_ptr<Iterator> NewMultiCfIterator(
3205+
const ReadOptions& /*options*/,
3206+
const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
3207+
return nullptr;
3208+
}
3209+
32023210
const Snapshot* GetSnapshot() override {
32033211
ModelSnapshot* snapshot = new ModelSnapshot;
32043212
snapshot->map_ = map_;

db/multi_cf_iterator.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) Meta Platforms, Inc. and affiliates.
2+
// This source code is licensed under both the GPLv2 (found in the
3+
// COPYING file in the root directory) and Apache 2.0 License
4+
// (found in the LICENSE.Apache file in the root directory).
5+
6+
#include "db/multi_cf_iterator.h"
7+
8+
#include <cassert>
9+
10+
namespace ROCKSDB_NAMESPACE {
11+
12+
void MultiCfIterator::SeekToFirst() {
13+
Reset();
14+
int i = 0;
15+
for (auto& cfh_iter_pair : cfh_iter_pairs_) {
16+
auto& cfh = cfh_iter_pair.first;
17+
auto& iter = cfh_iter_pair.second;
18+
iter->SeekToFirst();
19+
if (iter->Valid()) {
20+
assert(iter->status().ok());
21+
min_heap_.push(MultiCfIteratorInfo{iter.get(), cfh, i});
22+
} else {
23+
considerStatus(iter->status());
24+
}
25+
++i;
26+
}
27+
}
28+
29+
void MultiCfIterator::Next() {
30+
assert(Valid());
31+
// 1. Keep the top iterator (by popping it from the heap)
32+
// 2. Make sure all others have iterated past the top iterator key slice
33+
// 3. Advance the top iterator, and add it back to the heap if valid
34+
auto top = min_heap_.top();
35+
min_heap_.pop();
36+
if (!min_heap_.empty()) {
37+
auto* current = min_heap_.top().iterator;
38+
while (current->Valid() &&
39+
comparator_->Compare(top.iterator->key(), current->key()) == 0) {
40+
assert(current->status().ok());
41+
current->Next();
42+
if (current->Valid()) {
43+
min_heap_.replace_top(min_heap_.top());
44+
} else {
45+
considerStatus(current->status());
46+
min_heap_.pop();
47+
}
48+
if (!min_heap_.empty()) {
49+
current = min_heap_.top().iterator;
50+
}
51+
}
52+
}
53+
top.iterator->Next();
54+
if (top.iterator->Valid()) {
55+
assert(top.iterator->status().ok());
56+
min_heap_.push(top);
57+
} else {
58+
considerStatus(top.iterator->status());
59+
}
60+
}
61+
62+
} // namespace ROCKSDB_NAMESPACE

db/multi_cf_iterator.h

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright (c) Meta Platforms, Inc. and affiliates.
2+
// This source code is licensed under both the GPLv2 (found in the
3+
// COPYING file in the root directory) and Apache 2.0 License
4+
// (found in the LICENSE.Apache file in the root directory).
5+
6+
#pragma once
7+
8+
#include "rocksdb/comparator.h"
9+
#include "rocksdb/iterator.h"
10+
#include "rocksdb/options.h"
11+
#include "util/heap.h"
12+
13+
namespace ROCKSDB_NAMESPACE {
14+
15+
// UNDER CONSTRUCTION - DO NOT USE
16+
// A cross-column-family iterator from a consistent database state.
17+
// When the same key exists in more than one column families, the iterator
18+
// selects the value from the first column family containing the key, in the
19+
// order provided in the `column_families` parameter.
20+
class MultiCfIterator : public Iterator {
21+
public:
22+
MultiCfIterator(const Comparator* comparator,
23+
const std::vector<ColumnFamilyHandle*>& column_families,
24+
const std::vector<Iterator*>& child_iterators)
25+
: comparator_(comparator),
26+
min_heap_(MultiCfMinHeapItemComparator(comparator_)) {
27+
assert(column_families.size() > 0 &&
28+
column_families.size() == child_iterators.size());
29+
cfh_iter_pairs_.reserve(column_families.size());
30+
for (size_t i = 0; i < column_families.size(); ++i) {
31+
cfh_iter_pairs_.emplace_back(
32+
column_families[i], std::unique_ptr<Iterator>(child_iterators[i]));
33+
}
34+
}
35+
~MultiCfIterator() override { status_.PermitUncheckedError(); }
36+
37+
// No copy allowed
38+
MultiCfIterator(const MultiCfIterator&) = delete;
39+
MultiCfIterator& operator=(const MultiCfIterator&) = delete;
40+
41+
private:
42+
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
43+
cfh_iter_pairs_;
44+
ReadOptions read_options_;
45+
Status status_;
46+
47+
AttributeGroups attribute_groups_;
48+
49+
struct MultiCfIteratorInfo {
50+
Iterator* iterator;
51+
ColumnFamilyHandle* cfh;
52+
int order;
53+
};
54+
55+
class MultiCfMinHeapItemComparator {
56+
public:
57+
explicit MultiCfMinHeapItemComparator(const Comparator* comparator)
58+
: comparator_(comparator) {}
59+
60+
bool operator()(const MultiCfIteratorInfo& a,
61+
const MultiCfIteratorInfo& b) const {
62+
assert(a.iterator);
63+
assert(b.iterator);
64+
assert(a.iterator->Valid());
65+
assert(b.iterator->Valid());
66+
int c = comparator_->Compare(a.iterator->key(), b.iterator->key());
67+
assert(c != 0 || a.order != b.order);
68+
return c == 0 ? a.order - b.order > 0 : c > 0;
69+
}
70+
71+
private:
72+
const Comparator* comparator_;
73+
};
74+
75+
const Comparator* comparator_;
76+
using MultiCfMinHeap =
77+
BinaryHeap<MultiCfIteratorInfo, MultiCfMinHeapItemComparator>;
78+
MultiCfMinHeap min_heap_;
79+
// TODO: MaxHeap for Reverse Iteration
80+
// TODO: Lower and Upper bounds
81+
82+
Slice key() const override {
83+
assert(Valid());
84+
return min_heap_.top().iterator->key();
85+
}
86+
bool Valid() const override { return !min_heap_.empty() && status_.ok(); }
87+
Status status() const override { return status_; }
88+
void considerStatus(Status s) {
89+
if (!s.ok() && status_.ok()) {
90+
status_ = std::move(s);
91+
}
92+
}
93+
void Reset() {
94+
min_heap_.clear();
95+
status_ = Status::OK();
96+
}
97+
98+
void SeekToFirst() override;
99+
void Next() override;
100+
101+
// TODO - Implement these
102+
void Seek(const Slice& /*target*/) override {}
103+
void SeekForPrev(const Slice& /*target*/) override {}
104+
void SeekToLast() override {}
105+
void Prev() override { assert(false); }
106+
Slice value() const override {
107+
assert(Valid());
108+
return min_heap_.top().iterator->value();
109+
}
110+
const WideColumns& columns() const override {
111+
assert(Valid());
112+
return min_heap_.top().iterator->columns();
113+
}
114+
};
115+
116+
} // namespace ROCKSDB_NAMESPACE

0 commit comments

Comments
 (0)