Skip to content

Commit 35588e7

Browse files
committed
apply review suggestions
1 parent e82fb05 commit 35588e7

File tree

5 files changed

+226
-152
lines changed

5 files changed

+226
-152
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ databend-common-management = { workspace = true }
6969
databend-common-meta-api = { workspace = true }
7070
databend-common-meta-app = { workspace = true }
7171
databend-common-meta-app-types = { workspace = true }
72+
databend-common-meta-client = { workspace = true }
7273
databend-common-meta-kvapi = { workspace = true }
74+
databend-common-meta-semaphore = { workspace = true }
7375
databend-common-meta-store = { workspace = true }
7476
databend-common-meta-types = { workspace = true }
7577
databend-common-metrics = { workspace = true }

src/query/service/src/persistent_log/global_persistent_log.rs

Lines changed: 113 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,29 @@ use std::sync::Arc;
1818
use std::time::Duration;
1919

2020
use databend_common_base::base::GlobalInstance;
21+
use databend_common_base::runtime::spawn;
2122
use databend_common_base::runtime::GlobalIORuntime;
2223
use databend_common_base::runtime::MemStat;
2324
use databend_common_base::runtime::ThreadTracker;
2425
use databend_common_base::runtime::TrySpawn;
2526
use databend_common_catalog::catalog::CATALOG_DEFAULT;
2627
use databend_common_catalog::table_context::TableContext;
2728
use databend_common_config::InnerConfig;
29+
use databend_common_exception::ErrorCode;
2830
use databend_common_exception::Result;
2931
use databend_common_license::license::Feature;
3032
use databend_common_license::license_manager::LicenseManagerSwitch;
31-
use databend_common_meta_kvapi::kvapi::KVApi;
32-
use databend_common_meta_store::MetaStore;
33-
use databend_common_meta_store::MetaStoreProvider;
34-
use databend_common_meta_types::txn_condition;
35-
use databend_common_meta_types::ConditionResult;
36-
use databend_common_meta_types::TxnCondition;
37-
use databend_common_meta_types::TxnOp;
38-
use databend_common_meta_types::TxnRequest;
33+
use databend_common_meta_client::ClientHandle;
34+
use databend_common_meta_client::MetaGrpcClient;
35+
use databend_common_meta_semaphore::acquirer::Permit;
36+
use databend_common_meta_semaphore::Semaphore;
3937
use databend_common_sql::Planner;
4038
use databend_common_storage::DataOperator;
4139
use databend_common_tracing::GlobalLogger;
4240
use log::error;
4341
use log::info;
4442
use rand::random;
43+
use tokio::time::sleep;
4544

4645
use crate::interpreters::InterpreterFactory;
4746
use crate::persistent_log::session::create_session;
@@ -52,7 +51,7 @@ use crate::persistent_log::table_schemas::QueryProfileTable;
5251
use crate::sessions::QueryContext;
5352

5453
pub struct GlobalPersistentLog {
55-
meta_store: MetaStore,
54+
meta_client: Option<Arc<ClientHandle>>,
5655
interval: usize,
5756
tenant_id: String,
5857
node_id: String,
@@ -61,16 +60,17 @@ pub struct GlobalPersistentLog {
6160
initialized: AtomicBool,
6261
stopped: AtomicBool,
6362
tables: Vec<Box<dyn PersistentLogTable>>,
64-
#[allow(dead_code)]
6563
retention: usize,
6664
}
6765

6866
impl GlobalPersistentLog {
6967
pub async fn init(cfg: &InnerConfig) -> Result<()> {
7068
setup_operator().await?;
7169

72-
let provider = MetaStoreProvider::new(cfg.meta.to_meta_grpc_client_conf());
73-
let meta_store = provider.create_meta_store().await?;
70+
let meta_client =
71+
MetaGrpcClient::try_new(&cfg.meta.to_meta_grpc_client_conf()).map_err(|_e| {
72+
ErrorCode::Internal("Create MetaClient failed for GlobalPersistentLog")
73+
})?;
7474

7575
let mut tables: Vec<Box<dyn PersistentLogTable>> = vec![];
7676

@@ -100,7 +100,7 @@ impl GlobalPersistentLog {
100100
tables.push(Box::new(query_log));
101101

102102
let instance = Arc::new(Self {
103-
meta_store,
103+
meta_client: Some(meta_client),
104104
interval: cfg.log.persistentlog.interval,
105105
tenant_id: cfg.query.tenant_id.tenant_name().to_string(),
106106
node_id: cfg.query.node_id.clone(),
@@ -112,14 +112,37 @@ impl GlobalPersistentLog {
112112
retention: cfg.log.persistentlog.retention,
113113
});
114114
GlobalInstance::set(instance);
115-
GlobalIORuntime::instance().spawn(async move {
116-
if let Err(e) = GlobalPersistentLog::instance().work().await {
117-
error!("persistent log exit {}", e);
118-
}
119-
});
115+
GlobalIORuntime::instance().try_spawn(
116+
async move {
117+
if let Err(e) = GlobalPersistentLog::instance().work().await {
118+
error!("persistent log exit {}", e);
119+
}
120+
},
121+
Some("persistent-log-worker".to_string()),
122+
)?;
120123
Ok(())
121124
}
122125

126+
pub async fn create_dummy(cfg: &InnerConfig) -> Result<Self> {
127+
setup_operator().await?;
128+
Ok(Self {
129+
meta_client: None,
130+
interval: cfg.log.persistentlog.interval,
131+
tenant_id: cfg.query.tenant_id.tenant_name().to_string(),
132+
node_id: cfg.query.node_id.clone(),
133+
cluster_id: cfg.query.cluster_id.clone(),
134+
stage_name: cfg.log.persistentlog.stage_name.clone(),
135+
initialized: AtomicBool::new(false),
136+
stopped: AtomicBool::new(false),
137+
tables: vec![
138+
Box::new(QueryDetailsTable),
139+
Box::new(QueryProfileTable),
140+
Box::new(QueryLogTable),
141+
],
142+
retention: cfg.log.persistentlog.retention,
143+
})
144+
}
145+
123146
pub fn instance() -> Arc<GlobalPersistentLog> {
124147
GlobalInstance::get()
125148
}
@@ -130,29 +153,32 @@ impl GlobalPersistentLog {
130153

131154
pub async fn work(&self) -> Result<()> {
132155
let mut prepared = false;
133-
134-
// // Use a counter rather than a time interval to trigger cleanup operations.
135-
// // because in cluster environment, a time-based interval would cause cleanup frequency
136-
// // to scale with the number of nodes in the cluster, whereas this count-based
137-
// // approach ensures consistent cleanup frequency regardless of cluster size.
138-
// let thirty_minutes_in_seconds = 30 * 60;
139-
// let copy_into_threshold = thirty_minutes_in_seconds / self.interval;
140-
// let mut copy_into_count = 0;
141-
156+
// Wait all services to be initialized
157+
loop {
158+
if !self.initialized.load(Ordering::SeqCst) {
159+
tokio::time::sleep(Duration::from_secs(1)).await;
160+
} else {
161+
break;
162+
}
163+
}
164+
spawn(async move {
165+
if let Err(e) = GlobalPersistentLog::instance().clean_work().await {
166+
error!("Persistent log clean_work exit {}", e);
167+
}
168+
});
142169
loop {
143-
// add a random sleep time to avoid always one node doing the work
144-
let sleep_time = self.interval as u64 * 1000 + random::<u64>() % 1000;
145-
tokio::time::sleep(Duration::from_millis(sleep_time)).await;
146170
if self.stopped.load(Ordering::SeqCst) {
147171
return Ok(());
148172
}
149-
// Wait all services to be initialized
150-
if !self.initialized.load(Ordering::SeqCst) {
151-
continue;
152-
}
153173
// create the stage, database and table if not exists
154-
// only execute once, it is ok to do this in multiple nodes without lock
174+
// alter the table if schema is changed
155175
if !prepared {
176+
let prepare_guard = self
177+
.acquire(
178+
format!("{}/persistent_log_prepare", self.tenant_id),
179+
self.interval as u64,
180+
)
181+
.await?;
156182
match self.prepare().await {
157183
Ok(_) => {
158184
info!("Persistent log prepared successfully");
@@ -162,43 +188,41 @@ impl GlobalPersistentLog {
162188
error!("Persistent log prepare failed: {:?}", e);
163189
}
164190
}
191+
drop(prepare_guard);
165192
}
166-
if let Ok(acquired_lock) = self.try_acquire().await {
167-
if acquired_lock {
168-
if let Err(e) = self.do_copy_into().await {
169-
error!("Persistent log copy into failed: {:?}", e);
170-
}
171-
// copy_into_count += 1;
172-
// if copy_into_count > copy_into_threshold {
173-
// if let Err(e) = self.clean().await {
174-
// error!("Persistent log delete failed: {:?}", e);
175-
// }
176-
// copy_into_count = 0;
177-
// }
178-
}
193+
194+
let guard = self
195+
.acquire(
196+
format!("{}/persistent_log_work", self.tenant_id),
197+
self.interval as u64,
198+
)
199+
.await?;
200+
// add a random sleep time to avoid always one node doing the work
201+
let sleep_time = self.interval as u64 * 1000 + random::<u64>() % 1000;
202+
tokio::time::sleep(Duration::from_millis(sleep_time)).await;
203+
204+
if let Err(e) = self.do_copy_into().await {
205+
error!("Persistent log copy into failed: {:?}", e);
179206
}
207+
208+
drop(guard)
180209
}
181210
}
182211

183212
/// Multiple nodes doing the work may make commit conflict.
184-
pub async fn try_acquire(&self) -> Result<bool> {
185-
let meta_key = format!("{}/persistent_log_lock", self.tenant_id);
186-
let condition = vec![TxnCondition {
187-
key: meta_key.clone(),
188-
expected: ConditionResult::Eq as i32,
189-
target: Some(txn_condition::Target::Seq(0)),
190-
}];
191-
192-
let if_then = vec![TxnOp::put_with_ttl(
193-
&meta_key,
194-
self.node_id.clone().into(),
195-
Some(Duration::from_secs(self.interval as u64)),
196-
)];
213+
/// acquire the semaphore to avoid this.
214+
pub async fn acquire(&self, meta_key: String, lease: u64) -> Result<Permit> {
215+
let acquired_guard = Semaphore::new_acquired(
216+
self.meta_client.clone().unwrap(),
217+
meta_key,
218+
1,
219+
self.node_id.clone(),
220+
Duration::from_secs(lease),
221+
)
222+
.await
223+
.map_err(|_e| "acquire semaphore failed from GlobalPersistentLog")?;
197224

198-
let txn = TxnRequest::new(condition, if_then);
199-
let resp = self.meta_store.transaction(txn).await?;
200-
201-
Ok(resp.success)
225+
Ok(acquired_guard)
202226
}
203227

204228
async fn execute_sql(&self, sql: &str) -> Result<()> {
@@ -256,8 +280,8 @@ impl GlobalPersistentLog {
256280
Ok(())
257281
}
258282

259-
async fn do_copy_into(&self) -> Result<()> {
260-
let stage_name = GlobalPersistentLog::instance().stage_name.clone();
283+
pub async fn do_copy_into(&self) -> Result<()> {
284+
let stage_name = self.stage_name.clone();
261285
let operator = GlobalLogger::instance().get_operator().await;
262286
if let Some(op) = operator {
263287
let path = format!("stage/internal/{}/", stage_name);
@@ -284,23 +308,35 @@ impl GlobalPersistentLog {
284308
Ok(())
285309
}
286310

287-
/// Do retention and vacuum
288-
#[allow(dead_code)]
289-
async fn clean(&self) -> Result<()> {
290-
let delete = format!(
291-
"DELETE FROM persistent_system.query_log WHERE timestamp < subtract_hours(NOW(), {})",
292-
self.retention
293-
);
294-
self.execute_sql(&delete).await?;
311+
async fn clean_work(&self) -> Result<()> {
312+
loop {
313+
let guard = self
314+
.acquire(format!("{}/persistent_log_clean", self.tenant_id), 60)
315+
.await?;
316+
sleep(Duration::from_mins(60)).await;
317+
if let Err(e) = self.do_clean().await {
318+
error!("persistent log clean failed: {}", e);
319+
}
320+
drop(guard);
321+
}
322+
}
323+
324+
pub async fn do_clean(&self) -> Result<()> {
325+
for table in &self.tables {
326+
let clean_sql = table.clean_sql(self.retention);
327+
self.execute_sql(&clean_sql).await?;
328+
}
295329

296330
let session = create_session(&self.tenant_id, &self.cluster_id).await?;
297331
let context = session.create_query_context().await?;
298332
if LicenseManagerSwitch::instance()
299333
.check_enterprise_enabled(context.get_license_key(), Feature::Vacuum)
300334
.is_ok()
301335
{
302-
let vacuum = "VACUUM TABLE persistent_system.query_log";
303-
self.execute_sql(vacuum).await?
336+
for table in &self.tables {
337+
let vacuum = format!("VACUUM TABLE persistent_system.{}", table.table_name());
338+
self.execute_sql(&vacuum).await?
339+
}
304340
}
305341
Ok(())
306342
}

0 commit comments

Comments
 (0)