Skip to content

Commit bf5ae3b

Browse files
committed
apply review suggestion
1 parent 137cff0 commit bf5ae3b

File tree

1 file changed

+82
-31
lines changed

1 file changed

+82
-31
lines changed

src/query/service/src/persistent_log/global_persistent_log.rs

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,12 @@ use databend_common_license::license::Feature;
3232
use databend_common_license::license_manager::LicenseManagerSwitch;
3333
use databend_common_meta_client::ClientHandle;
3434
use databend_common_meta_client::MetaGrpcClient;
35+
use databend_common_meta_kvapi::kvapi::KVApi;
3536
use databend_common_meta_semaphore::acquirer::Permit;
3637
use databend_common_meta_semaphore::Semaphore;
38+
use databend_common_meta_types::MatchSeq;
39+
use databend_common_meta_types::Operation;
40+
use databend_common_meta_types::UpsertKV;
3741
use databend_common_sql::Planner;
3842
use databend_common_storage::DataOperator;
3943
use databend_common_tracing::GlobalLogger;
@@ -52,7 +56,7 @@ use crate::sessions::QueryContext;
5256

5357
pub struct GlobalPersistentLog {
5458
meta_client: Option<Arc<ClientHandle>>,
55-
interval: usize,
59+
interval: u64,
5660
tenant_id: String,
5761
node_id: String,
5862
cluster_id: String,
@@ -101,7 +105,7 @@ impl GlobalPersistentLog {
101105

102106
let instance = Arc::new(Self {
103107
meta_client: Some(meta_client),
104-
interval: cfg.log.persistentlog.interval,
108+
interval: cfg.log.persistentlog.interval as u64,
105109
tenant_id: cfg.query.tenant_id.tenant_name().to_string(),
106110
node_id: cfg.query.node_id.clone(),
107111
cluster_id: cfg.query.cluster_id.clone(),
@@ -127,7 +131,7 @@ impl GlobalPersistentLog {
127131
setup_operator().await?;
128132
Ok(Self {
129133
meta_client: None,
130-
interval: cfg.log.persistentlog.interval,
134+
interval: cfg.log.persistentlog.interval as u64,
131135
tenant_id: cfg.query.tenant_id.tenant_name().to_string(),
132136
node_id: cfg.query.node_id.clone(),
133137
cluster_id: cfg.query.cluster_id.clone(),
@@ -175,8 +179,9 @@ impl GlobalPersistentLog {
175179
if !prepared {
176180
let prepare_guard = self
177181
.acquire(
178-
format!("{}/persistent_log_prepare", self.tenant_id),
179-
self.interval as u64,
182+
&format!("{}/persistent_log_prepare", self.tenant_id),
183+
self.interval,
184+
0,
180185
)
181186
.await?;
182187
match self.prepare().await {
@@ -190,28 +195,33 @@ impl GlobalPersistentLog {
190195
}
191196
drop(prepare_guard);
192197
}
193-
194-
let guard = self
195-
.acquire(
196-
format!("{}/persistent_log_work", self.tenant_id),
197-
self.interval as u64,
198-
)
198+
let meta_key = format!("{}/persistent_log_work", self.tenant_id);
199+
let may_permit = self
200+
.acquire(&meta_key, self.interval, self.interval)
199201
.await?;
200-
// add a random sleep time to avoid always one node doing the work
201-
let sleep_time = self.interval as u64 * 1000 + random::<u64>() % 1000;
202-
tokio::time::sleep(Duration::from_millis(sleep_time)).await;
203-
204-
if let Err(e) = self.do_copy_into().await {
205-
error!("Persistent log copy into failed: {:?}", e);
202+
if let Some(guard) = may_permit {
203+
if let Err(e) = self.do_copy_into().await {
204+
error!("Persistent log copy into failed: {:?}", e);
205+
}
206+
self.finish_hook(&meta_key).await?;
207+
drop(guard);
206208
}
207-
208-
drop(guard)
209+
// add a random sleep time (from 0.5*interval to 1.5*interval) to avoid always one node doing the work
210+
let sleep_time = self.interval * 500 + random::<u64>() % (self.interval * 1000);
211+
tokio::time::sleep(Duration::from_millis(sleep_time)).await;
209212
}
210213
}
211214

212-
/// Multiple nodes doing the work may make commit conflict.
213-
/// acquire the semaphore to avoid this.
214-
pub async fn acquire(&self, meta_key: String, lease: u64) -> Result<Permit> {
215+
/// Acquires a permit from a distributed semaphore with timestamp-based rate limiting.
216+
///
217+
/// This function attempts to acquire a permit from a distributed semaphore identified by `meta_key`.
218+
/// It also implements a rate limiting mechanism based on the last execution timestamp.
219+
pub async fn acquire(
220+
&self,
221+
meta_key: &str,
222+
lease: u64,
223+
interval: u64,
224+
) -> Result<Option<Permit>> {
215225
let acquired_guard = Semaphore::new_acquired(
216226
self.meta_client.clone().unwrap(),
217227
meta_key,
@@ -221,8 +231,45 @@ impl GlobalPersistentLog {
221231
)
222232
.await
223233
.map_err(|_e| "acquire semaphore failed from GlobalPersistentLog")?;
234+
if interval == 0 {
235+
return Ok(Some(acquired_guard));
236+
}
237+
if match self
238+
.meta_client
239+
.clone()
240+
.unwrap()
241+
.get_kv(&format!("{}/last_timestamp", meta_key))
242+
.await?
243+
{
244+
Some(v) => {
245+
let last: u64 = serde_json::from_slice(&v.data)?;
246+
chrono::Local::now().timestamp_millis() as u64
247+
- Duration::from_secs(interval).as_millis() as u64
248+
> last
249+
}
250+
None => true,
251+
} {
252+
Ok(Some(acquired_guard))
253+
} else {
254+
drop(acquired_guard);
255+
Ok(None)
256+
}
257+
}
224258

225-
Ok(acquired_guard)
259+
pub async fn finish_hook(&self, meta_key: &str) -> Result<()> {
260+
self.meta_client
261+
.clone()
262+
.unwrap()
263+
.upsert_kv(UpsertKV::new(
264+
format!("{}/last_timestamp", meta_key),
265+
MatchSeq::Any,
266+
Operation::Update(serde_json::to_vec(
267+
&chrono::Local::now().timestamp_millis(),
268+
)?),
269+
None,
270+
))
271+
.await?;
272+
Ok(())
226273
}
227274

228275
async fn execute_sql(&self, sql: &str) -> Result<()> {
@@ -263,7 +310,7 @@ impl GlobalPersistentLog {
263310
let old_schema = old_table?.schema();
264311
if !table.schema_equal(old_schema) {
265312
let rename_target =
266-
format!("`{}_old_{}`", table_name, chrono::Utc::now().timestamp());
313+
format!("`{}_old_{}`", table_name, chrono::Local::now().timestamp());
267314
let rename = format!(
268315
"ALTER TABLE persistent_system.{} RENAME TO {}",
269316
table_name, rename_target
@@ -310,14 +357,18 @@ impl GlobalPersistentLog {
310357

311358
async fn clean_work(&self) -> Result<()> {
312359
loop {
313-
let guard = self
314-
.acquire(format!("{}/persistent_log_clean", self.tenant_id), 60)
315-
.await?;
316-
sleep(Duration::from_mins(60)).await;
317-
if let Err(e) = self.do_clean().await {
318-
error!("persistent log clean failed: {}", e);
360+
let meta_key = format!("{}/persistent_log_clean", self.tenant_id);
361+
let may_permit = self.acquire(&meta_key, 60, 60 * 60).await?;
362+
if let Some(guard) = may_permit {
363+
if let Err(e) = self.do_clean().await {
364+
error!("persistent log clean failed: {}", e);
365+
}
366+
self.finish_hook(&meta_key).await?;
367+
drop(guard);
319368
}
320-
drop(guard);
369+
370+
// sleep for a random time between 30 and 90 minutes
371+
sleep(Duration::from_mins(30 + random::<u64>() % 60)).await;
321372
}
322373
}
323374

0 commit comments

Comments
 (0)