Skip to content

Commit a6e4807

Browse files
authored
chore(query): add iceberg table metadata cache (#17780)
* feat(query): add iceberg table metadata cache * feat(query): add iceberg table metadata cache * feat(query): add iceberg table metadata cache * feat(query): add iceberg table metadata cache
1 parent 3dc7840 commit a6e4807

File tree

31 files changed

+356
-87
lines changed

31 files changed

+356
-87
lines changed

Cargo.lock

Lines changed: 2 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/app/src/schema/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ pub struct IcebergGlueCatalogOption {
129129
/// Same as `CatalogNameIdent`, but with `serde` support,
130130
/// and can be used a s part of a value.
131131
// #[derive(Clone, Debug, PartialEq, Eq)]
132-
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
132+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Hash, Eq)]
133133
pub struct CatalogName {
134134
pub tenant: String,
135135
pub catalog_name: String,

src/query/catalog/src/catalog/interface.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::any::Any;
1616
use std::fmt::Debug;
1717
use std::sync::Arc;
1818

19-
use databend_common_config::InnerConfig;
2019
use databend_common_exception::ErrorCode;
2120
use databend_common_exception::Result;
2221
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
@@ -106,7 +105,6 @@ use databend_common_meta_app::schema::UpsertTableOptionReply;
106105
use databend_common_meta_app::schema::UpsertTableOptionReq;
107106
use databend_common_meta_app::schema::VirtualColumnMeta;
108107
use databend_common_meta_app::tenant::Tenant;
109-
use databend_common_meta_store::MetaStore;
110108
use databend_common_meta_types::MetaId;
111109
use databend_common_meta_types::SeqV;
112110
use databend_storages_common_session::SessionState;
@@ -127,12 +125,7 @@ pub struct StorageDescription {
127125
}
128126

129127
pub trait CatalogCreator: Send + Sync + Debug {
130-
fn try_create(
131-
&self,
132-
info: Arc<CatalogInfo>,
133-
conf: InnerConfig,
134-
meta: &MetaStore,
135-
) -> Result<Arc<dyn Catalog>>;
128+
fn try_create(&self, info: Arc<CatalogInfo>) -> Result<Arc<dyn Catalog>>;
136129
}
137130

138131
#[async_trait::async_trait]

src/query/catalog/src/catalog/manager.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ pub struct CatalogManager {
5454

5555
/// catalog_creators is the catalog creators that registered.
5656
pub catalog_creators: HashMap<CatalogType, Arc<dyn CatalogCreator>>,
57-
58-
conf: InnerConfig,
5957
}
6058

6159
impl CatalogManager {
@@ -114,7 +112,7 @@ impl CatalogManager {
114112
created_on: Utc::now(),
115113
},
116114
};
117-
let ctl = creator.try_create(Arc::new(ctl_info), conf.to_owned(), &meta)?;
115+
let ctl = creator.try_create(Arc::new(ctl_info))?;
118116
external_catalogs.insert(name.clone(), ctl);
119117
}
120118

@@ -123,7 +121,6 @@ impl CatalogManager {
123121
default_catalog,
124122
external_catalogs,
125123
catalog_creators,
126-
conf: conf.to_owned(),
127124
};
128125

129126
Ok(Arc::new(catalog_manager))
@@ -153,8 +150,7 @@ impl CatalogManager {
153150
.catalog_creators
154151
.get(&typ)
155152
.ok_or_else(|| ErrorCode::BadArguments(format!("unknown catalog type: {:?}", typ)))?;
156-
157-
creator.try_create(info, self.conf.clone(), &self.meta)
153+
creator.try_create(info)
158154
}
159155

160156
/// Get a catalog from manager.
@@ -184,7 +180,6 @@ impl CatalogManager {
184180

185181
// Get catalog from metasrv.
186182
let info = self.meta.get_catalog(&ident).await?;
187-
188183
self.build_catalog(info, session_state)
189184
}
190185

src/query/catalog/src/database.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ pub trait Database: DynClone + Sync + Send {
108108
Ok(vec![])
109109
}
110110

111+
#[async_backtrace::framed]
112+
async fn trigger_use(&self) -> Result<()> {
113+
Ok(())
114+
}
115+
111116
#[async_backtrace::framed]
112117
async fn list_tables_names(&self) -> Result<Vec<String>> {
113118
Ok(vec![])

src/query/config/src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3070,6 +3070,13 @@ pub struct CacheConfig {
30703070
)]
30713071
pub table_data_deserialized_memory_ratio: u64,
30723072

3073+
#[clap(
3074+
long = "cache-iceberg-table-meta-count",
3075+
value_name = "VALUE",
3076+
default_value = "1024"
3077+
)]
3078+
pub iceberg_table_meta_count: u64,
3079+
30733080
// ----- the following options/args are all deprecated ----
30743081
/// Max number of cached table segment
30753082
#[clap(long = "cache-table-meta-segment-count", value_name = "VALUE")]
@@ -3297,6 +3304,7 @@ mod cache_config_converters {
32973304
data_cache_key_reload_policy: value.data_cache_key_reload_policy.try_into()?,
32983305
table_data_deserialized_data_bytes: value.table_data_deserialized_data_bytes,
32993306
table_data_deserialized_memory_ratio: value.table_data_deserialized_memory_ratio,
3307+
iceberg_table_meta_count: value.iceberg_table_meta_count,
33003308
disk_cache_table_bloom_index_meta_size: value
33013309
.disk_cache_table_bloom_index_meta_size,
33023310
})
@@ -3330,6 +3338,7 @@ mod cache_config_converters {
33303338
disk_cache_config: value.disk_cache_config.into(),
33313339
table_data_deserialized_data_bytes: value.table_data_deserialized_data_bytes,
33323340
table_data_deserialized_memory_ratio: value.table_data_deserialized_memory_ratio,
3341+
iceberg_table_meta_count: value.iceberg_table_meta_count,
33333342
table_meta_segment_count: None,
33343343
segment_block_metas_count: value.segment_block_metas_count,
33353344
}

src/query/config/src/inner.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,9 @@ pub struct CacheConfig {
636636
/// Only if query nodes have plenty of un-utilized memory, the working set can be fitted into,
637637
/// and the access pattern will benefit from caching, consider enabled this cache.
638638
pub table_data_deserialized_memory_ratio: u64,
639+
640+
/// Max number of cached table count of iceberg tables
641+
pub iceberg_table_meta_count: u64,
639642
}
640643

641644
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -733,6 +736,7 @@ impl Default for CacheConfig {
733736
data_cache_key_reload_policy: Default::default(),
734737
table_data_deserialized_data_bytes: 0,
735738
table_data_deserialized_memory_ratio: 0,
739+
iceberg_table_meta_count: 1024,
736740
}
737741
}
738742
}

src/query/service/src/interpreters/interpreter_use_database.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,12 @@ impl Interpreter for UseDatabaseInterpreter {
5050
if self.plan.database.trim().is_empty() {
5151
return Err(ErrorCode::UnknownDatabase("No database selected"));
5252
}
53-
self.ctx
53+
let db = self
54+
.ctx
5455
.set_current_database(self.plan.database.clone())
5556
.await?;
57+
db.trigger_use().await?;
58+
5659
self.ctx.set_affect(QueryAffect::UseDB {
5760
name: self.plan.database.clone(),
5861
});

src/query/service/src/sessions/query_ctx.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use databend_common_base::runtime::MemStat;
4141
use databend_common_base::runtime::TrySpawn;
4242
use databend_common_base::JoinHandle;
4343
use databend_common_catalog::catalog::CATALOG_DEFAULT;
44+
use databend_common_catalog::database::Database;
4445
use databend_common_catalog::lock::LockTableOption;
4546
use databend_common_catalog::merge_into_join::MergeIntoJoin;
4647
use databend_common_catalog::plan::DataSourceInfo;
@@ -257,22 +258,26 @@ impl QueryContext {
257258
}
258259

259260
#[async_backtrace::framed]
260-
pub async fn set_current_database(&self, new_database_name: String) -> Result<()> {
261+
pub async fn set_current_database(
262+
&self,
263+
new_database_name: String,
264+
) -> Result<Arc<dyn Database>> {
261265
let tenant_id = self.get_tenant();
262266
let catalog = self
263267
.get_catalog(self.get_current_catalog().as_str())
264268
.await?;
265269
match catalog.get_database(&tenant_id, &new_database_name).await {
266-
Ok(_) => self.shared.set_current_database(new_database_name),
270+
Ok(db) => {
271+
self.shared.set_current_database(new_database_name);
272+
Ok(db)
273+
}
267274
Err(_) => {
268275
return Err(ErrorCode::UnknownDatabase(format!(
269276
"Cannot use database '{}': It does not exist.",
270277
new_database_name
271278
)));
272279
}
273-
};
274-
275-
Ok(())
280+
}
276281
}
277282

278283
pub fn attach_table(&self, catalog: &str, database: &str, name: &str, table: Arc<dyn Table>) {

src/query/service/tests/it/storages/testdata/caches_table.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ DB.Table: 'system'.'caches', Table: caches-table_id:1, ver:0, Engine: SystemCach
88
| 'test-node' | 'memory_cache_bloom_index_filter' | 0 | 0 | 2147483648 | 'bytes' | 0 | 0 | 0 |
99
| 'test-node' | 'memory_cache_column_oriented_segment_info' | 0 | 0 | 1073741824 | 'bytes' | 0 | 0 | 0 |
1010
| 'test-node' | 'memory_cache_compact_segment_info' | 0 | 0 | 1073741824 | 'bytes' | 0 | 0 | 0 |
11+
| 'test-node' | 'memory_cache_iceberg_table' | 0 | 0 | 1024 | 'count' | 0 | 0 | 0 |
1112
| 'test-node' | 'memory_cache_inverted_index_file' | 0 | 0 | 2147483648 | 'bytes' | 0 | 0 | 0 |
1213
| 'test-node' | 'memory_cache_inverted_index_file_meta_data' | 0 | 0 | 3000 | 'count' | 0 | 0 | 0 |
1314
| 'test-node' | 'memory_cache_parquet_meta_data' | 0 | 0 | 3000 | 'count' | 0 | 0 | 0 |

src/query/service/tests/it/storages/testdata/configs_table_basic.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
1414
| 'cache' | 'disk_cache_table_bloom_index_meta_size' | '0' | '' |
1515
| 'cache' | 'enable_table_bloom_index_cache' | 'true' | '' |
1616
| 'cache' | 'enable_table_meta_cache' | 'true' | '' |
17+
| 'cache' | 'iceberg_table_meta_count' | '1024' | '' |
1718
| 'cache' | 'inverted_index_filter_memory_ratio' | '0' | '' |
1819
| 'cache' | 'inverted_index_filter_size' | '2147483648' | '' |
1920
| 'cache' | 'inverted_index_meta_count' | '3000' | '' |

src/query/storages/common/cache/src/caches.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::atomic::AtomicBool;
1516
use std::sync::Arc;
17+
use std::time::Instant;
1618

1719
use arrow::array::ArrayRef;
1820
use databend_common_cache::MemSized;
1921
use databend_common_catalog::plan::PartStatistics;
2022
use databend_common_catalog::plan::Partitions;
23+
use databend_common_catalog::table::Table;
2124
use databend_storages_common_index::filters::Xor8Filter;
2225
use databend_storages_common_index::BloomIndexMeta;
2326
use databend_storages_common_index::InvertedIndexFile;
@@ -67,6 +70,8 @@ pub type ParquetMetaDataCache = InMemoryLruCache<ParquetMetaData>;
6770

6871
pub type PrunePartitionsCache = InMemoryLruCache<(PartStatistics, Partitions)>;
6972

73+
pub type IcebergTableCache = InMemoryLruCache<(Arc<dyn Table>, AtomicBool, Instant)>;
74+
7075
/// In memory object cache of table column array
7176
pub type ColumnArrayCache = InMemoryLruCache<SizedColumnArray>;
7277
pub type ArrayRawDataUncompressedSize = usize;
@@ -103,6 +108,13 @@ impl CachedObject<Vec<Arc<BlockMeta>>> for Vec<Arc<BlockMeta>> {
103108
}
104109
}
105110

111+
impl CachedObject<(Arc<dyn Table>, AtomicBool, Instant)> for (Arc<dyn Table>, AtomicBool, Instant) {
112+
type Cache = IcebergTableCache;
113+
fn cache() -> Option<Self::Cache> {
114+
CacheManager::instance().get_iceberg_table_cache()
115+
}
116+
}
117+
106118
impl CachedObject<TableSnapshotStatistics> for TableSnapshotStatistics {
107119
type Cache = TableSnapshotStatisticCache;
108120
fn cache() -> Option<Self::Cache> {
@@ -207,6 +219,17 @@ impl From<BlockMeta> for CacheValue<BlockMeta> {
207219
}
208220
}
209221

222+
impl From<(Arc<dyn Table>, AtomicBool, Instant)>
223+
for CacheValue<(Arc<dyn Table>, AtomicBool, Instant)>
224+
{
225+
fn from(value: (Arc<dyn Table>, AtomicBool, Instant)) -> Self {
226+
CacheValue {
227+
inner: Arc::new(value),
228+
mem_bytes: 0,
229+
}
230+
}
231+
}
232+
210233
impl From<TableSnapshot> for CacheValue<TableSnapshot> {
211234
fn from(value: TableSnapshot) -> Self {
212235
CacheValue {

src/query/storages/common/cache/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub use read::CacheKey;
4848
pub use read::CachedReader;
4949
pub use read::HybridCacheReader;
5050
pub use read::InMemoryCacheReader;
51-
pub use read::InMemoryItemCacheReader;
51+
pub use read::InMemoryCacheTTLReader;
5252
pub use read::LoadParams;
5353
pub use read::Loader;
5454
pub use temp_dir::*;

src/query/storages/common/cache/src/manager.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::caches::CacheValue;
3333
use crate::caches::ColumnArrayCache;
3434
use crate::caches::ColumnOrientedSegmentInfoCache;
3535
use crate::caches::CompactSegmentInfoCache;
36+
use crate::caches::IcebergTableCache;
3637
use crate::caches::InvertedIndexFileCache;
3738
use crate::caches::InvertedIndexMetaCache;
3839
use crate::caches::ParquetMetaDataCache;
@@ -88,6 +89,9 @@ pub struct CacheManager {
8889
segment_block_metas_cache: CacheSlot<SegmentBlockMetasCache>,
8990
block_meta_cache: CacheSlot<BlockMetaCache>,
9091

92+
// icebergs
93+
iceberg_table_meta_cache: CacheSlot<IcebergTableCache>,
94+
9195
/// Determines whether disk caches can be used at runtime.
9296
///
9397
/// This flag allows or disallows disk caches, but the cache configuration takes precedence:
@@ -199,6 +203,7 @@ impl CacheManager {
199203
in_memory_table_data_cache,
200204
segment_block_metas_cache: CacheSlot::new(None),
201205
block_meta_cache: CacheSlot::new(None),
206+
iceberg_table_meta_cache: CacheSlot::new(None),
202207
allows_on_disk_cache,
203208
}
204209
} else {
@@ -291,6 +296,12 @@ impl CacheManager {
291296
config.block_meta_count as usize,
292297
);
293298

299+
// TODO ADD CONFIG
300+
let iceberg_table_meta_cache = Self::new_items_cache_slot(
301+
MEMORY_CACHE_ICEBERG_TABLE,
302+
config.iceberg_table_meta_count as usize,
303+
);
304+
294305
Self {
295306
table_snapshot_cache,
296307
compact_segment_info_cache,
@@ -306,6 +317,7 @@ impl CacheManager {
306317
segment_block_metas_cache,
307318
parquet_meta_data_cache,
308319
block_meta_cache,
320+
iceberg_table_meta_cache,
309321
allows_on_disk_cache,
310322
}
311323
};
@@ -468,6 +480,10 @@ impl CacheManager {
468480
self.block_meta_cache.get()
469481
}
470482

483+
pub fn get_iceberg_table_cache(&self) -> Option<IcebergTableCache> {
484+
self.iceberg_table_meta_cache.get()
485+
}
486+
471487
pub fn get_table_snapshot_statistics_cache(&self) -> Option<TableSnapshotStatisticCache> {
472488
self.table_statistic_cache.get()
473489
}
@@ -658,6 +674,7 @@ const MEMORY_CACHE_COLUMN_ORIENTED_SEGMENT_INFO: &str = "memory_cache_column_ori
658674
const MEMORY_CACHE_TABLE_STATISTICS: &str = "memory_cache_table_statistics";
659675
const MEMORY_CACHE_TABLE_SNAPSHOT: &str = "memory_cache_table_snapshot";
660676
const MEMORY_CACHE_SEGMENT_BLOCK_METAS: &str = "memory_cache_segment_block_metas";
677+
const MEMORY_CACHE_ICEBERG_TABLE: &str = "memory_cache_iceberg_table";
661678

662679
const MEMORY_CACHE_BLOCK_META: &str = "memory_cache_block_meta";
663680

0 commit comments

Comments
 (0)