diff --git a/Cargo.lock b/Cargo.lock index 17bacc32c4572..38460890a83cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5131,7 +5131,9 @@ dependencies = [ "databend-common-meta-api", "databend-common-meta-app", "databend-common-meta-app-types", + "databend-common-meta-client", "databend-common-meta-kvapi", + "databend-common-meta-semaphore", "databend-common-meta-store", "databend-common-meta-types", "databend-common-metrics", diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index 029548b5edcf5..61888287b0e78 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -63,7 +63,7 @@ impl GlobalLogger { } // Get the operator for remote log when it is ready. - pub(crate) async fn get_operator(&self) -> Option { + pub async fn get_operator(&self) -> Option { let operator = self.remote_log_operator.read().await; if let Some(operator) = operator.as_ref() { return Some(operator.clone()); @@ -383,14 +383,14 @@ pub fn init_logging( let mut filter_builder = EnvFilterBuilder::new().filter(Some("databend::log::structlog"), LevelFilter::Off); - if cfg.profile.on && !cfg.profile.dir.is_empty() { + if cfg.profile.on { filter_builder = filter_builder.filter(Some("databend::log::profile"), LevelFilter::Trace); } else { filter_builder = filter_builder.filter(Some("databend::log::profile"), LevelFilter::Off); } - if cfg.query.on && !cfg.query.dir.is_empty() { + if cfg.query.on { filter_builder = filter_builder.filter(Some("databend::log::query"), LevelFilter::Trace); } else { diff --git a/src/common/tracing/src/lib.rs b/src/common/tracing/src/lib.rs index 3525e00b4020a..3a48ec1a7870b 100644 --- a/src/common/tracing/src/lib.rs +++ b/src/common/tracing/src/lib.rs @@ -53,6 +53,7 @@ pub use crate::remote_log::LogMessage; pub use crate::remote_log::RemoteLog; pub use crate::remote_log::RemoteLogElement; pub use crate::remote_log::RemoteLogGuard; +pub use crate::remote_log::PERSISTENT_LOG_SCHEMA_VERSION; pub use crate::structlog::DummyReporter; pub use crate::structlog::StructLogReporter; diff --git a/src/common/tracing/src/remote_log.rs b/src/common/tracing/src/remote_log.rs index e20cce12da57e..d1e98a156f343 100644 --- a/src/common/tracing/src/remote_log.rs +++ b/src/common/tracing/src/remote_log.rs @@ -53,6 +53,8 @@ use serde_json::Map; use crate::Config; use crate::GlobalLogger; +pub const PERSISTENT_LOG_SCHEMA_VERSION: usize = 1; + /// An appender that sends log records to persistent storage #[derive(Debug)] pub struct RemoteLog { @@ -178,8 +180,9 @@ impl RemoteLog { op.as_ref()?; let path = format!( - "stage/internal/{}/{}.parquet", + "stage/internal/{}_v{}/{}.parquet", stage_name, + PERSISTENT_LOG_SCHEMA_VERSION, uuid::Uuid::new_v4() ); diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 9ce2612fe9bef..37fb41dde46f1 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -67,7 +67,9 @@ databend-common-management = { workspace = true } databend-common-meta-api = { workspace = true } databend-common-meta-app = { workspace = true } databend-common-meta-app-types = { workspace = true } +databend-common-meta-client = { workspace = true } databend-common-meta-kvapi = { workspace = true } +databend-common-meta-semaphore = { workspace = true } databend-common-meta-store = { workspace = true } databend-common-meta-types = { workspace = true } databend-common-metrics = { workspace = true } diff --git a/src/query/service/src/persistent_log/global_persistent_log.rs b/src/query/service/src/persistent_log/global_persistent_log.rs index 976a75522e2ff..c1b4842b7d870 100644 --- a/src/query/service/src/persistent_log/global_persistent_log.rs +++ b/src/query/service/src/persistent_log/global_persistent_log.rs @@ -18,45 +18,53 @@ use std::sync::Arc; use std::time::Duration; use databend_common_base::base::GlobalInstance; +use databend_common_base::runtime::spawn; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::MemStat; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrySpawn; +use databend_common_catalog::catalog::CATALOG_DEFAULT; use databend_common_catalog::table_context::TableContext; use databend_common_config::InnerConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_license::license::Feature; use databend_common_license::license_manager::LicenseManagerSwitch; +use databend_common_meta_client::MetaGrpcClient; use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_semaphore::acquirer::Permit; +use databend_common_meta_semaphore::Semaphore; use databend_common_meta_store::MetaStore; use databend_common_meta_store::MetaStoreProvider; -use databend_common_meta_types::txn_condition; -use databend_common_meta_types::ConditionResult; -use databend_common_meta_types::TxnCondition; -use databend_common_meta_types::TxnOp; -use databend_common_meta_types::TxnRequest; +use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::Operation; +use databend_common_meta_types::UpsertKV; use databend_common_sql::Planner; use databend_common_storage::DataOperator; use databend_common_tracing::GlobalLogger; +use databend_common_tracing::PERSISTENT_LOG_SCHEMA_VERSION; use log::error; use log::info; use rand::random; +use tokio::time::sleep; use crate::interpreters::InterpreterFactory; use crate::persistent_log::session::create_session; +use crate::persistent_log::table_schemas::PersistentLogTable; +use crate::persistent_log::table_schemas::QueryDetailsTable; +use crate::persistent_log::table_schemas::QueryLogTable; +use crate::persistent_log::table_schemas::QueryProfileTable; use crate::sessions::QueryContext; pub struct GlobalPersistentLog { meta_store: MetaStore, - interval: usize, + interval: u64, tenant_id: String, node_id: String, cluster_id: String, stage_name: String, initialized: AtomicBool, - stopped: AtomicBool, - #[allow(dead_code)] + tables: Vec>, retention: usize, } @@ -64,31 +72,91 @@ impl GlobalPersistentLog { pub async fn init(cfg: &InnerConfig) -> Result<()> { setup_operator().await?; - let provider = MetaStoreProvider::new(cfg.meta.to_meta_grpc_client_conf()); - let meta_store = provider.create_meta_store().await.map_err(|e| { - ErrorCode::MetaServiceError(format!("Failed to create meta store: {}", e)) - })?; + let meta_client = + MetaGrpcClient::try_new(&cfg.meta.to_meta_grpc_client_conf()).map_err(|_e| { + ErrorCode::Internal("Create MetaClient failed for GlobalPersistentLog") + })?; + + let mut tables: Vec> = vec![]; + + if cfg.log.query.on { + let query_details = QueryDetailsTable::new(); + info!( + "Persistent query details table is enabled, persistent_system.{}", + query_details.table_name() + ); + tables.push(Box::new(query_details)); + } + + if cfg.log.profile.on { + let profile = QueryProfileTable::new(); + info!( + "Persistent query profile table is enabled, persistent_system.{}", + profile.table_name() + ); + tables.push(Box::new(profile)); + } + + let query_log = QueryLogTable::new(); + info!( + "Persistent query log table is enabled, persistent_system.{}", + query_log.table_name() + ); + tables.push(Box::new(query_log)); + + let stage_name = format!( + "{}_v{}", + cfg.log.persistentlog.stage_name.clone(), + PERSISTENT_LOG_SCHEMA_VERSION + ); let instance = Arc::new(Self { - meta_store, - interval: cfg.log.persistentlog.interval, + meta_store: MetaStore::R(meta_client), + interval: cfg.log.persistentlog.interval as u64, tenant_id: cfg.query.tenant_id.tenant_name().to_string(), node_id: cfg.query.node_id.clone(), cluster_id: cfg.query.cluster_id.clone(), - stage_name: cfg.log.persistentlog.stage_name.clone(), + stage_name, initialized: AtomicBool::new(false), - stopped: AtomicBool::new(false), + tables, retention: cfg.log.persistentlog.retention, }); GlobalInstance::set(instance); - GlobalIORuntime::instance().spawn(async move { - if let Err(e) = GlobalPersistentLog::instance().work().await { - error!("persistent log exit {}", e); - } - }); + GlobalIORuntime::instance().try_spawn( + async move { + if let Err(e) = GlobalPersistentLog::instance().work().await { + error!("persistent log exit {}", e); + } + }, + Some("persistent-log-worker".to_string()), + )?; Ok(()) } + /// Create a dummy instance of GlobalPersistentLog for testing purposes. + pub async fn create_dummy(cfg: &InnerConfig) -> Result { + setup_operator().await?; + let meta_store = MetaStoreProvider::new(cfg.meta.to_meta_grpc_client_conf()) + .create_meta_store() + .await + .map_err(|_e| ErrorCode::Internal("create memory meta store failed"))?; + Ok(Self { + meta_store, + interval: cfg.log.persistentlog.interval as u64, + tenant_id: cfg.query.tenant_id.tenant_name().to_string(), + node_id: cfg.query.node_id.clone(), + cluster_id: cfg.query.cluster_id.clone(), + stage_name: cfg.log.persistentlog.stage_name.clone(), + initialized: AtomicBool::new(false), + tables: vec![ + Box::new(QueryDetailsTable::new()), + Box::new(QueryProfileTable::new()), + Box::new(QueryLogTable::new()), + ], + retention: cfg.log.persistentlog.retention, + }) + } + pub fn instance() -> Arc { GlobalInstance::get() } @@ -99,30 +167,25 @@ impl GlobalPersistentLog { pub async fn work(&self) -> Result<()> { let mut prepared = false; - - // // Use a counter rather than a time interval to trigger cleanup operations. - // // because in cluster environment, a time-based interval would cause cleanup frequency - // // to scale with the number of nodes in the cluster, whereas this count-based - // // approach ensures consistent cleanup frequency regardless of cluster size. - // let thirty_minutes_in_seconds = 30 * 60; - // let copy_into_threshold = thirty_minutes_in_seconds / self.interval; - // let mut copy_into_count = 0; - + let meta_key = format!("{}/persistent_log_work", self.tenant_id); + // Wait all services to be initialized loop { - // add a random sleep time to avoid always one node doing the work - let sleep_time = self.interval as u64 * 1000 + random::() % 1000; - - tokio::time::sleep(Duration::from_millis(sleep_time)).await; - if self.stopped.load(Ordering::SeqCst) { - return Ok(()); - } - // Wait all services to be initialized if !self.initialized.load(Ordering::SeqCst) { - continue; + tokio::time::sleep(Duration::from_secs(1)).await; + } else { + break; } + } + spawn(async move { + if let Err(e) = GlobalPersistentLog::instance().clean_work().await { + error!("Persistent log clean_work exit {}", e); + } + }); + loop { // create the stage, database and table if not exists - // only execute once, it is ok to do this in multiple nodes without lock + // alter the table if schema is changed if !prepared { + let prepare_guard = self.acquire(&meta_key, self.interval, 0).await?; match self.prepare().await { Ok(_) => { info!("Persistent log prepared successfully"); @@ -132,43 +195,91 @@ impl GlobalPersistentLog { error!("Persistent log prepare failed: {:?}", e); } } + drop(prepare_guard); } - if let Ok(acquired_lock) = self.try_acquire().await { - if acquired_lock { - if let Err(e) = self.do_copy_into().await { - error!("Persistent log copy into failed: {:?}", e); + let may_permit = self + .acquire(&meta_key, self.interval, self.interval) + .await?; + if let Some(guard) = may_permit { + if let Err(e) = self.do_copy_into().await { + error!("Persistent log copy into failed: {:?}", e); + let latest_version = self.get_version_from_meta().await?; + if let Some(version) = latest_version { + if version > PERSISTENT_LOG_SCHEMA_VERSION as u64 { + info!("Persistent log tables enable version suffix"); + for table in &self.tables { + table.enable_version_suffix(); + } + } } - // copy_into_count += 1; - // if copy_into_count > copy_into_threshold { - // if let Err(e) = self.clean().await { - // error!("Persistent log delete failed: {:?}", e); - // } - // copy_into_count = 0; - // } } + self.finish_hook(&meta_key).await?; + drop(guard); } + // add a random sleep time (from 0.5*interval to 1.5*interval) to avoid always one node doing the work + let sleep_time = self.interval * 500 + random::() % (self.interval * 1000); + tokio::time::sleep(Duration::from_millis(sleep_time)).await; } } - /// Multiple nodes doing the work may make commit conflict. - pub async fn try_acquire(&self) -> Result { - let meta_key = format!("{}/persistent_log_lock", self.tenant_id); - let condition = vec![TxnCondition { - key: meta_key.clone(), - expected: ConditionResult::Eq as i32, - target: Some(txn_condition::Target::Seq(0)), - }]; - - let if_then = vec![TxnOp::put_with_ttl( - &meta_key, - self.node_id.clone().into(), - Some(Duration::from_secs(self.interval as u64)), - )]; - - let txn = TxnRequest::new(condition, if_then); - let resp = self.meta_store.transaction(txn).await?; + /// Acquires a permit from a distributed semaphore with timestamp-based rate limiting. + /// + /// This function attempts to acquire a permit from a distributed semaphore identified by `meta_key`. + /// It also implements a rate limiting mechanism based on the last execution timestamp. + pub async fn acquire( + &self, + meta_key: &str, + lease: u64, + interval: u64, + ) -> Result> { + let meta_client = match &self.meta_store { + MetaStore::R(handle) => handle.clone(), + _ => unreachable!("Metastore::L should only used for testing"), + }; + let acquired_guard = Semaphore::new_acquired( + meta_client, + meta_key, + 1, + self.node_id.clone(), + Duration::from_secs(lease), + ) + .await + .map_err(|_e| "acquire semaphore failed from GlobalPersistentLog")?; + if interval == 0 { + return Ok(Some(acquired_guard)); + } + if match self + .meta_store + .get_kv(&format!("{}/last_timestamp", meta_key)) + .await? + { + Some(v) => { + let last: u64 = serde_json::from_slice(&v.data)?; + chrono::Local::now().timestamp_millis() as u64 + - Duration::from_secs(interval).as_millis() as u64 + > last + } + None => true, + } { + Ok(Some(acquired_guard)) + } else { + drop(acquired_guard); + Ok(None) + } + } - Ok(resp.success) + pub async fn finish_hook(&self, meta_key: &str) -> Result<()> { + self.meta_store + .upsert_kv(UpsertKV::new( + format!("{}/last_timestamp", meta_key), + MatchSeq::Any, + Operation::Update(serde_json::to_vec( + &chrono::Local::now().timestamp_millis(), + )?), + None, + )) + .await?; + Ok(()) } async fn execute_sql(&self, sql: &str) -> Result<()> { @@ -191,48 +302,107 @@ impl GlobalPersistentLog { Ok(()) } - async fn prepare(&self) -> Result<()> { + pub async fn prepare(&self) -> Result<()> { let stage_name = self.stage_name.clone(); let create_stage = format!("CREATE STAGE IF NOT EXISTS {}", stage_name); self.execute_sql(&create_stage).await?; let create_db = "CREATE DATABASE IF NOT EXISTS persistent_system"; self.execute_sql(create_db).await?; - let create_table = " - CREATE TABLE IF NOT EXISTS persistent_system.query_log ( - timestamp TIMESTAMP, - path VARCHAR, - target VARCHAR, - log_level VARCHAR, - cluster_id VARCHAR, - node_id VARCHAR, - warehouse_id VARCHAR, - query_id VARCHAR, - message VARCHAR, - fields VARIANT - ) CLUSTER BY (timestamp, query_id)"; - self.execute_sql(create_table).await?; + + let session = create_session(&self.tenant_id, &self.cluster_id).await?; + let context = session.create_query_context().await?; + if let Some(version) = self.get_version_from_meta().await? { + if version > PERSISTENT_LOG_SCHEMA_VERSION as u64 { + // older version node need put the logs into the table has version suffix + for table in &self.tables { + table.enable_version_suffix(); + } + return Ok(()); + } + let mut need_rename = false; + for table in &self.tables { + let old_table = context + .get_table(CATALOG_DEFAULT, "persistent_system", &table.table_name()) + .await; + if old_table.is_ok() { + let old_schema = old_table?.schema(); + if !table.schema_equal(old_schema) { + need_rename = true; + } + } + } + if need_rename { + for table in &self.tables { + let old_table_name = format!("`{}_v{}`", table.table_name(), version); + let rename_sql = format!( + "ALTER TABLE IF EXISTS persistent_system.{} RENAME TO {}", + table.table_name(), + old_table_name + ); + self.execute_sql(&rename_sql).await?; + } + } + } + self.set_version_to_meta(PERSISTENT_LOG_SCHEMA_VERSION) + .await?; + for table in &self.tables { + let create_table = table.create_table_sql(); + self.execute_sql(&create_table).await?; + } + Ok(()) } - async fn do_copy_into(&self) -> Result<()> { - let stage_name = GlobalPersistentLog::instance().stage_name.clone(); - let sql = format!( - "COPY INTO persistent_system.query_log - FROM @{} PATTERN = '.*[.]parquet' file_format = (TYPE = PARQUET) - PURGE = TRUE", - stage_name - ); - self.execute_sql(&sql).await + pub async fn do_copy_into(&self) -> Result<()> { + let stage_name = self.stage_name.clone(); + let operator = GlobalLogger::instance().get_operator().await; + if let Some(op) = operator { + let path = format!("stage/internal/{}/", stage_name); + // Why we need to list the files first? + // Consider this case: + // After executing the two insert statements, a new file is created in the stage. + // Copy into enable the `PURGE` option, which will delete all files in the stage. + // the new file will be deleted and not inserted into the tables. + let files: Vec = op + .list(&path) + .await? + .into_iter() + .filter(|f| f.name().ends_with(".parquet")) + .map(|f| f.name().to_string()) + .collect(); + if files.is_empty() { + return Ok(()); + } + for table in &self.tables { + self.execute_sql(&table.copy_into_sql(&stage_name, &files)) + .await?; + } + } + Ok(()) } - /// Do retention and vacuum - #[allow(dead_code)] - async fn clean(&self) -> Result<()> { - let delete = format!( - "DELETE FROM persistent_system.query_log WHERE timestamp < subtract_hours(NOW(), {})", - self.retention - ); - self.execute_sql(&delete).await?; + async fn clean_work(&self) -> Result<()> { + loop { + let meta_key = format!("{}/persistent_log_clean", self.tenant_id); + let may_permit = self.acquire(&meta_key, 60, 60 * 60).await?; + if let Some(guard) = may_permit { + if let Err(e) = self.do_clean().await { + error!("persistent log clean failed: {}", e); + } + self.finish_hook(&meta_key).await?; + drop(guard); + } + + // sleep for a random time between 30 and 90 minutes + sleep(Duration::from_mins(30 + random::() % 60)).await; + } + } + + pub async fn do_clean(&self) -> Result<()> { + for table in &self.tables { + let clean_sql = table.clean_sql(self.retention); + self.execute_sql(&clean_sql).await?; + } let session = create_session(&self.tenant_id, &self.cluster_id).await?; let context = session.create_query_context().await?; @@ -240,14 +410,38 @@ impl GlobalPersistentLog { .check_enterprise_enabled(context.get_license_key(), Feature::Vacuum) .is_ok() { - let vacuum = "VACUUM TABLE persistent_system.query_log"; - self.execute_sql(vacuum).await? + for table in &self.tables { + let vacuum = format!("VACUUM TABLE persistent_system.{}", table.table_name()); + self.execute_sql(&vacuum).await? + } } Ok(()) } - pub fn stop(&self) { - self.stopped.store(true, Ordering::SeqCst); + pub async fn get_version_from_meta(&self) -> Result> { + match self + .meta_store + .get_kv(&format!("{}/persistent_log_work/version", self.tenant_id)) + .await? + { + Some(v) => { + let version: u64 = serde_json::from_slice(&v.data)?; + Ok(Some(version)) + } + None => Ok(None), + } + } + + pub async fn set_version_to_meta(&self, version: usize) -> Result<()> { + self.meta_store + .upsert_kv(UpsertKV::new( + format!("{}/persistent_log_work/version", self.tenant_id), + MatchSeq::Any, + Operation::Update(serde_json::to_vec(&version)?), + None, + )) + .await?; + Ok(()) } } diff --git a/src/query/service/src/persistent_log/mod.rs b/src/query/service/src/persistent_log/mod.rs index 8821cd0ae4bcb..66379173fd2fc 100644 --- a/src/query/service/src/persistent_log/mod.rs +++ b/src/query/service/src/persistent_log/mod.rs @@ -14,5 +14,10 @@ mod global_persistent_log; mod session; +mod table_schemas; pub use global_persistent_log::GlobalPersistentLog; +pub use table_schemas::PersistentLogTable; +pub use table_schemas::QueryDetailsTable; +pub use table_schemas::QueryLogTable; +pub use table_schemas::QueryProfileTable; diff --git a/src/query/service/src/persistent_log/table_schemas.rs b/src/query/service/src/persistent_log/table_schemas.rs new file mode 100644 index 0000000000000..165aa159097f8 --- /dev/null +++ b/src/query/service/src/persistent_log/table_schemas.rs @@ -0,0 +1,512 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +use databend_common_expression::types::NumberDataType; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchemaRef; +use databend_common_expression::TableSchemaRefExt; +use databend_common_tracing::PERSISTENT_LOG_SCHEMA_VERSION; +use itertools::Itertools; + +pub trait PersistentLogTable: Send + Sync + 'static { + fn table_name(&self) -> String; + fn enable_version_suffix(&self); + fn schema(&self) -> TableSchemaRef; + fn cluster_by(&self) -> Vec; + fn create_table_sql(&self) -> String { + let table_name = self.table_name(); + let schema = self.schema(); + let fields = schema + .fields() + .iter() + .map(|f| format!("{} {}", f.name(), f.data_type().sql_name())) + .collect::>() + .join(", "); + let cluster_by = self.cluster_by().join(", "); + format!( + "CREATE TABLE IF NOT EXISTS persistent_system.{} ({}) CLUSTER BY ({})", + table_name, fields, cluster_by + ) + } + + fn copy_into_sql(&self, stage_name: &str, files: &[String]) -> String; + fn schema_equal(&self, other: TableSchemaRef) -> bool { + self.schema().fields().len() == other.fields().len() + && self + .schema() + .fields() + .iter() + .zip(other.fields().iter()) + .all(|(a, b)| a.name() == b.name() && a.data_type() == b.data_type()) + } + fn clean_sql(&self, retention: usize) -> String; +} + +pub struct QueryLogTable { + need_version_suffix: AtomicBool, +} + +impl QueryLogTable { + pub fn new() -> Self { + Self { + need_version_suffix: AtomicBool::new(false), + } + } +} +impl Default for QueryLogTable { + fn default() -> Self { + Self::new() + } +} + +impl PersistentLogTable for QueryLogTable { + fn table_name(&self) -> String { + if self.need_version_suffix.load(Ordering::Relaxed) { + format!("query_log_v{}", PERSISTENT_LOG_SCHEMA_VERSION) + } else { + "query_log".to_string() + } + } + + fn enable_version_suffix(&self) { + self.need_version_suffix.store(true, Ordering::Relaxed); + } + + fn schema(&self) -> TableSchemaRef { + TableSchemaRefExt::create(vec![ + TableField::new( + "timestamp", + TableDataType::Nullable(Box::new(TableDataType::Timestamp)), + ), + TableField::new( + "path", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "target", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "log_level", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "cluster_id", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "node_id", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "warehouse_id", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "query_id", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "message", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "fields", + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ]) + } + + fn cluster_by(&self) -> Vec { + vec!["timestamp".to_string(), "query_id".to_string()] + } + + fn copy_into_sql(&self, stage_name: &str, files: &[String]) -> String { + let file_names = files.iter().map(|f| format!("'{}'", f)).join(","); + format!( + "COPY INTO persistent_system.{} + FROM @{} FILES=({}) file_format = (TYPE = PARQUET) + PURGE = TRUE", + self.table_name(), + stage_name, + file_names + ) + } + + fn clean_sql(&self, retention: usize) -> String { + let table_name = self.table_name(); + format!( + "DELETE FROM persistent_system.{} WHERE timestamp < subtract_hours(NOW(), {})", + table_name, retention + ) + } +} + +pub struct QueryDetailsTable { + need_version_suffix: AtomicBool, +} + +impl QueryDetailsTable { + pub fn new() -> Self { + Self { + need_version_suffix: AtomicBool::new(false), + } + } +} +impl Default for QueryDetailsTable { + fn default() -> Self { + Self::new() + } +} + +impl PersistentLogTable for QueryDetailsTable { + fn table_name(&self) -> String { + if self.need_version_suffix.load(Ordering::Relaxed) { + format!("query_details_v{}", PERSISTENT_LOG_SCHEMA_VERSION) + } else { + "query_details".to_string() + } + } + + fn enable_version_suffix(&self) { + self.need_version_suffix.store(true, Ordering::Relaxed); + } + + fn schema(&self) -> TableSchemaRef { + TableSchemaRefExt::create(vec![ + // Type. + TableField::new( + "log_type", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int8))), + ), + TableField::new( + "log_type_name", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "handler_type", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + // User. + TableField::new( + "tenant_id", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "cluster_id", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "node_id", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "sql_user", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "sql_user_quota", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "sql_user_privileges", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + // Query. + TableField::new( + "query_id", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "query_kind", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "query_text", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "query_hash", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "query_parameterized_hash", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "event_date", + TableDataType::Nullable(Box::new(TableDataType::Date)), + ), + TableField::new( + "event_time", + TableDataType::Nullable(Box::new(TableDataType::Timestamp)), + ), + TableField::new( + "query_start_time", + TableDataType::Nullable(Box::new(TableDataType::Timestamp)), + ), + TableField::new( + "query_duration_ms", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int64))), + ), + TableField::new( + "query_queued_duration_ms", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int64))), + ), + // Schema. + TableField::new( + "current_database", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + // Stats. + TableField::new( + "written_rows", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "written_bytes", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "join_spilled_rows", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "join_spilled_bytes", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "agg_spilled_rows", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "agg_spilled_bytes", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "group_by_spilled_rows", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "group_by_spilled_bytes", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "written_io_bytes", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "written_io_bytes_cost_ms", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "scan_rows", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "scan_bytes", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "scan_io_bytes", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "scan_io_bytes_cost_ms", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "scan_partitions", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "total_partitions", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "result_rows", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "result_bytes", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "bytes_from_remote_disk", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "bytes_from_local_disk", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + TableField::new( + "bytes_from_memory", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), + // Client. + TableField::new( + "client_address", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "user_agent", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + // Exception. + TableField::new( + "exception_code", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int32))), + ), + TableField::new( + "exception_text", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + // Server. + TableField::new( + "server_version", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + // Session + TableField::new( + "query_tag", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "has_profile", + TableDataType::Nullable(Box::new(TableDataType::Boolean)), + ), + TableField::new( + "peek_memory_usage", + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ]) + } + + fn cluster_by(&self) -> Vec { + vec!["event_time".to_string(), "query_id".to_string()] + } + + fn copy_into_sql(&self, stage_name: &str, files: &[String]) -> String { + let fields = self + .schema() + .fields() + .iter() + .map(|f| format!("m['{}']", f.name())) + .collect::>() + .join(", "); + let file_names = files.iter().map(|f| format!("'{}'", f)).join(","); + format!( + "INSERT INTO persistent_system.{} FROM (SELECT {} FROM (SELECT parse_json(message) as m FROM @{} (FILES=>({})) WHERE target='databend::log::query'))", + self.table_name(), + fields, + stage_name, + file_names + ) + } + + fn clean_sql(&self, retention: usize) -> String { + let table_name = self.table_name(); + format!( + "DELETE FROM persistent_system.{} WHERE event_time < subtract_hours(NOW(), {})", + table_name, retention + ) + } +} + +pub struct QueryProfileTable { + need_version_suffix: AtomicBool, +} + +impl QueryProfileTable { + pub fn new() -> Self { + Self { + need_version_suffix: AtomicBool::new(false), + } + } +} +impl Default for QueryProfileTable { + fn default() -> Self { + Self::new() + } +} + +impl PersistentLogTable for QueryProfileTable { + fn table_name(&self) -> String { + if self.need_version_suffix.load(Ordering::Relaxed) { + format!("query_profile_v{}", PERSISTENT_LOG_SCHEMA_VERSION) + } else { + "query_profile".to_string() + } + } + + fn enable_version_suffix(&self) { + self.need_version_suffix.store(true, Ordering::Relaxed); + } + + fn schema(&self) -> TableSchemaRef { + TableSchemaRefExt::create(vec![ + TableField::new( + "timestamp", + TableDataType::Nullable(Box::new(TableDataType::Timestamp)), + ), + TableField::new( + "query_id", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + TableField::new( + "profiles", + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + TableField::new( + "statistics_desc", + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ]) + } + + fn cluster_by(&self) -> Vec { + vec!["timestamp".to_string(), "query_id".to_string()] + } + + fn copy_into_sql(&self, stage_name: &str, files: &[String]) -> String { + let fields = self + .schema() + .fields() + .iter() + .filter(|f| f.name() != "timestamp") + .map(|f| format!("m['{}']", f.name())) + .collect::>() + .join(", "); + let file_names = files.iter().map(|f| format!("'{}'", f)).join(","); + format!( + "INSERT INTO persistent_system.{} FROM (SELECT timestamp, {} FROM (SELECT timestamp, parse_json(message) as m FROM @{} (FILES=>({})) WHERE target='databend::log::profile'))", + self.table_name(), + fields, + stage_name, + file_names + ) + } + + fn clean_sql(&self, retention: usize) -> String { + let table_name = self.table_name(); + format!( + "DELETE FROM persistent_system.{} WHERE timestamp < subtract_hours(NOW(), {})", + table_name, retention + ) + } +} diff --git a/src/query/service/tests/it/persistent_log/global_persistent_log.rs b/src/query/service/tests/it/persistent_log/global_persistent_log.rs index 07a2d966c731d..a4e6aee98c21b 100644 --- a/src/query/service/tests/it/persistent_log/global_persistent_log.rs +++ b/src/query/service/tests/it/persistent_log/global_persistent_log.rs @@ -11,68 +11,92 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::io::Write; -use databend_common_base::runtime::GlobalIORuntime; -use databend_common_base::runtime::TrySpawn; +use databend_common_catalog::catalog::CATALOG_DEFAULT; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_storage::DataOperator; use databend_common_tracing::RemoteLog; use databend_common_tracing::RemoteLogElement; use databend_query::persistent_log::GlobalPersistentLog; +use databend_query::persistent_log::PersistentLogTable; +use databend_query::persistent_log::QueryDetailsTable; +use databend_query::persistent_log::QueryLogTable; +use databend_query::persistent_log::QueryProfileTable; use databend_query::test_kits::ConfigBuilder; use databend_query::test_kits::TestFixture; use futures_util::TryStreamExt; +use goldenfile::Mint; #[tokio::test(flavor = "multi_thread")] -pub async fn test_global_persistent_log_acquire_lock() -> Result<()> { +pub async fn test_persistent_log_write() -> Result<()> { let mut config = ConfigBuilder::create().config(); - config.log.persistentlog.on = true; - let _guard = TestFixture::setup_with_config(&config).await?; - let res = GlobalPersistentLog::instance().try_acquire().await?; - assert!(res, "should acquire lock"); + config.log.persistentlog.retention = 0; + let fixture = TestFixture::setup_with_config(&config).await?; + write_remote_log(&config.log.persistentlog.stage_name).await?; - let res = GlobalPersistentLog::instance().try_acquire().await?; - assert!(!res, "should not acquire lock before expire"); - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + let log_instance = GlobalPersistentLog::create_dummy(&config).await?; - let res = GlobalPersistentLog::instance().try_acquire().await?; - assert!(res, "should acquire lock after expire"); + log_instance.prepare().await?; + log_instance.do_copy_into().await?; + check_count(&fixture, vec![3, 1, 1]).await?; + // Extreme case: retention is 0, so all logs should be deleted + log_instance.do_clean().await?; + check_count(&fixture, vec![0, 0, 0]).await?; Ok(()) } #[tokio::test(flavor = "multi_thread")] -pub async fn test_persistent_log_write() -> Result<()> { - let mut config = ConfigBuilder::create().config(); - config.log.persistentlog.on = true; - config.log.persistentlog.interval = 1; - config.log.file.on = false; - +pub async fn test_persistent_log_alter_if_schema_change() -> Result<()> { + let config = ConfigBuilder::create().config(); + // Test the schema change of persistent log table let fixture = TestFixture::setup_with_config(&config).await?; - // Add more workers to test acquire lock and copy into - GlobalIORuntime::instance().spawn(async move { - let _ = GlobalPersistentLog::instance().work().await; - }); + let _ = fixture + .execute_query("create database persistent_system") + .await?; + // create a table with the same name + let _ = fixture + .execute_query( + "CREATE TABLE IF NOT EXISTS persistent_system.query_log (timestamp TIMESTAMP)", + ) + .await?; - GlobalIORuntime::instance().spawn(async move { - let _ = GlobalPersistentLog::instance().work().await; - }); + let _ = fixture + .execute_query("INSERT INTO persistent_system.query_log (timestamp) VALUES (now())") + .await?; - GlobalPersistentLog::instance().initialized(); + let log_instance = GlobalPersistentLog::create_dummy(&config).await?; - let random_sleep = rand::random::() % 3 + 1; - for _i in 0..3 { - write_remote_log(&config.log.persistentlog.stage_name).await?; - tokio::time::sleep(std::time::Duration::from_secs(random_sleep)).await; - } - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + log_instance.set_version_to_meta(0).await?; + + log_instance.prepare().await?; + + let context = fixture.new_query_ctx().await?; + + assert!(context + .get_table(CATALOG_DEFAULT, "persistent_system", "query_log_v0") + .await + .is_ok()); + assert!(context + .get_table(CATALOG_DEFAULT, "persistent_system", "query_profile") + .await + .is_ok()); + assert!(context + .get_table(CATALOG_DEFAULT, "persistent_system", "query_details") + .await + .is_ok()); + assert!(context + .get_table(CATALOG_DEFAULT, "persistent_system", "query_log") + .await + .is_ok()); let res = fixture - .execute_query("select count(*) from persistent_system.query_log") + .execute_query("select count(*) from persistent_system.query_log_v0") .await?; - let data_blocks: Vec = res.try_collect().await?; let cnt = data_blocks[0].clone().take_columns()[0] .clone() @@ -80,11 +104,67 @@ pub async fn test_persistent_log_write() -> Result<()> { .into_scalar() .unwrap() .get_i64(); + assert_eq!(cnt, Some(1)); + + Ok(()) +} - assert_eq!(cnt, Some(2 * 3)); +#[test] +pub fn log_table_schema_change_must_increase_version_number() -> Result<()> { + // If this test failed, please increase version number for PERSISTENT_LOG_SCHEMA_VERSION + // and rerun this test with `UPDATE_GOLDENFILES=1` + let mut mint = Mint::new("tests/it/persistent_log/testdata"); + let file = &mut mint.new_goldenfile("persistent_log_tables_schema.txt")?; + let tables: Vec> = vec![ + Box::new(QueryDetailsTable::new()), + Box::new(QueryProfileTable::new()), + Box::new(QueryLogTable::new()), + ]; + for table in tables.iter() { + writeln!(file, "{}", table.table_name())?; + let schema = table.schema(); + for field in schema.fields().iter() { + writeln!(file, "{}:{}", field.name(), field.data_type())?; + } + } + Ok(()) +} - GlobalPersistentLog::instance().stop(); +async fn check_count(fixture: &TestFixture, expected: Vec) -> Result<()> { + let res = fixture + .execute_query("select count(*) from persistent_system.query_log") + .await?; + let data_blocks: Vec = res.try_collect().await?; + let cnt = data_blocks[0].clone().take_columns()[0] + .clone() + .value + .into_scalar() + .unwrap() + .get_i64(); + assert_eq!(cnt, Some(expected[0] as i64)); + let res = fixture + .execute_query("select count(*) from persistent_system.query_details") + .await?; + let data_blocks: Vec = res.try_collect().await?; + let cnt = data_blocks[0].clone().take_columns()[0] + .clone() + .value + .into_scalar() + .unwrap() + .get_i64(); + assert_eq!(cnt, Some(expected[1] as i64)); + let res = fixture + .execute_query("select count(*) from persistent_system.query_profile") + .await?; + let data_blocks: Vec = res.try_collect().await?; + let cnt = data_blocks[0].clone().take_columns()[0] + .clone() + .value + .into_scalar() + .unwrap() + .get_i64(); + assert_eq!(cnt, Some(expected[2] as i64)); Ok(()) } @@ -103,15 +183,14 @@ fn get_remote_log() -> Vec { vec![ RemoteLogElement { timestamp: chrono::Local::now().timestamp_micros(), - path: "databend_query::interpreters::common::query_log: query_log.rs:71".to_string(), - target: "databend::log::query".to_string(), + path: "databend_common_meta_semaphore::meta_event_subscriber::subscriber: subscriber.rs:52".to_string(), + target: "databend_common_meta_semaphore::meta_event_subscriber::subscriber".to_string(), cluster_id: "test_cluster".to_string(), node_id: "izs9przqhAN4n5hbJanJm2".to_string(), query_id: Some("89ad07ad-83fe-4424-8005-4c5b318a7212".to_string()), warehouse_id: None, + log_level: "WARN".to_string(), message: "test".to_string(), - log_level: "INFO".to_string(), - fields: r#"{"message":"test"}"#.to_string(), }, RemoteLogElement { @@ -123,8 +202,21 @@ fn get_remote_log() -> Vec { query_id: Some("89ad07ad-83fe-4424-8005-4c5b318a7212".to_string()), warehouse_id: None, log_level: "INFO".to_string(), - message: "test2".to_string(), - fields: r#"{"message":"test"}"#.to_string(), + message: r#"{"log_type":1,"log_type_name":"Start","handler_type":"Dummy","tenant_id":"test_tenant","cluster_id":"test_cluster","node_id":"E5q3SYqfNRUF6iRBEU2Cw5","sql_user":"test_tenant-test_cluster-persistent-log","query_id":"5cf4e80b-a169-46ee-8b4e-57cd9a675557","query_kind":"Insert","query_text":"INSERT INTO persistent_system.query_details SELECT * FROM (SELECT m['log_type'], m['log_type_name'], m['handler_type'], m['tenan...[1166 more characters]","query_hash":"","query_parameterized_hash":"","event_date":"2025-04-09","event_time":"2025-04-09 11:20:47.795743","query_start_time":"2025-04-09 11:20:47.714015","query_duration_ms":0,"query_queued_duration_ms":0,"current_database":"default","databases":"","tables":"","columns":"","projections":"","written_rows":0,"written_bytes":0,"written_io_bytes":0,"written_io_bytes_cost_ms":0,"scan_rows":0,"scan_bytes":0,"scan_io_bytes":0,"scan_io_bytes_cost_ms":0,"scan_partitions":0,"total_partitions":0,"result_rows":0,"result_bytes":0,"cpu_usage":8,"memory_usage":0,"join_spilled_bytes":0,"join_spilled_rows":0,"agg_spilled_bytes":0,"agg_spilled_rows":0,"group_by_spilled_bytes":0,"group_by_spilled_rows":0,"bytes_from_remote_disk":0,"bytes_from_local_disk":0,"bytes_from_memory":0,"client_info":"","client_address":"","user_agent":"null","exception_code":0,"exception_text":"","stack_trace":"","server_version":"v1.2.718-nightly-e82fb056d0(rust-1.85.0-nightly-2025-04-09T10:02:05.358810000Z)","query_tag":"","extra":"","has_profiles":false,"txn_state":"AutoCommit","txn_id":"","peek_memory_usage":{}}"#.to_string(), + fields: "{}".to_string() }, + RemoteLogElement { + timestamp: chrono::Local::now().timestamp_micros(), + path: "databend_query::interpreters::interpreter: interpreter.rs:315".to_string(), + target: "databend::log::profile".to_string(), + log_level: "INFO".to_string(), + cluster_id: "test_cluster".to_string(), + node_id: "Io95Mk1ULcoWkZ8FIFUog1".to_string(), + warehouse_id: None, + query_id: Some("992fa371-4636-43a8-9879-90f7e75b15e8".to_string()), + message: r#"{"query_id":"992fa371-4636-43a8-9879-90f7e75b15e8","profiles":[{"id":0,"name":"EvalScalar","parent_id":null,"title":"123","labels":[{"name":"List of Expressions","value":["123"]}],"statistics":[29875,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"errors":[]},{"id":1,"name":"TableScan","parent_id":0,"title":"default.'system'.'one'","labels":[{"name":"Columns (1 / 1)","value":["dummy"]},{"name":"Total partitions","value":["1"]},{"name":"Full table name","value":["default.'system'.'one'"]}],"statistics":[19459,0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"errors":[]}],"statistics_desc":{"CpuTime":{"desc":"The time spent to process in nanoseconds","display_name":"cpu time","index":0,"unit":"NanoSeconds","plain_statistics":false},"WaitTime":{"desc":"The time spent to wait in nanoseconds, usually used to measure the time spent on waiting for I/O","display_name":"wait time","index":1,"unit":"NanoSeconds","plain_statistics":false},"ExchangeRows":{"desc":"The number of data rows exchange between nodes in cluster mode","display_name":"exchange rows","index":2,"unit":"Rows","plain_statistics":true},"ExchangeBytes":{"desc":"The number of data bytes exchange between nodes in cluster mode","display_name":"exchange bytes","index":3,"unit":"Bytes","plain_statistics":true},"OutputRows":{"desc":"The number of rows from the physical plan output to the next physical plan","display_name":"output rows","index":4,"unit":"Rows","plain_statistics":true},"OutputBytes":{"desc":"The number of bytes from the physical plan output to the next physical plan","display_name":"output bytes","index":5,"unit":"Bytes","plain_statistics":true},"ScanBytes":{"desc":"The bytes scanned of query","display_name":"bytes scanned","index":6,"unit":"Bytes","plain_statistics":true},"ScanCacheBytes":{"desc":"The bytes scanned from cache of query","display_name":"bytes scanned from cache","index":7,"unit":"Bytes","plain_statistics":true},"ScanPartitions":{"desc":"The partitions scanned of query","display_name":"partitions scanned","index":8,"unit":"Count","plain_statistics":true},"RemoteSpillWriteCount":{"desc":"The number of remote spilled by write","display_name":"numbers remote spilled by write","index":9,"unit":"Count","plain_statistics":true},"RemoteSpillWriteBytes":{"desc":"The bytes remote spilled by write","display_name":"bytes remote spilled by write","index":10,"unit":"Bytes","plain_statistics":true},"RemoteSpillWriteTime":{"desc":"The time spent to write remote spill in millisecond","display_name":"remote spilled time by write","index":11,"unit":"MillisSeconds","plain_statistics":false},"RemoteSpillReadCount":{"desc":"The number of remote spilled by read","display_name":"numbers remote spilled by read","index":12,"unit":"Count","plain_statistics":true},"RemoteSpillReadBytes":{"desc":"The bytes remote spilled by read","display_name":"bytes remote spilled by read","index":13,"unit":"Bytes","plain_statistics":true},"RemoteSpillReadTime":{"desc":"The time spent to read remote spill in millisecond","display_name":"remote spilled time by read","index":14,"unit":"MillisSeconds","plain_statistics":false},"LocalSpillWriteCount":{"desc":"The number of local spilled by write","display_name":"numbers local spilled by write","index":15,"unit":"Count","plain_statistics":true},"LocalSpillWriteBytes":{"desc":"The bytes local spilled by write","display_name":"bytes local spilled by write","index":16,"unit":"Bytes","plain_statistics":true},"LocalSpillWriteTime":{"desc":"The time spent to write local spill in millisecond","display_name":"local spilled time by write","index":17,"unit":"MillisSeconds","plain_statistics":false},"LocalSpillReadCount":{"desc":"The number of local spilled by read","display_name":"numbers local spilled by read","index":18,"unit":"Count","plain_statistics":true},"LocalSpillReadBytes":{"desc":"The bytes local spilled by read","display_name":"bytes local spilled by read","index":19,"unit":"Bytes","plain_statistics":true},"LocalSpillReadTime":{"desc":"The time spent to read local spill in millisecond","display_name":"local spilled time by read","index":20,"unit":"MillisSeconds","plain_statistics":false},"RuntimeFilterPruneParts":{"desc":"The partitions pruned by runtime filter","display_name":"parts pruned by runtime filter","index":21,"unit":"Count","plain_statistics":true},"MemoryUsage":{"desc":"The real time memory usage","display_name":"memory usage","index":22,"unit":"Bytes","plain_statistics":false},"ExternalServerRetryCount":{"desc":"The count of external server retry times","display_name":"external server retry count","index":23,"unit":"Count","plain_statistics":true},"ExternalServerRequestCount":{"desc":"The count of external server request times","display_name":"external server request count","index":24,"unit":"Count","plain_statistics":true}}}"#.to_string(), + fields: "{}".to_string(), + + } ] } diff --git a/src/query/service/tests/it/persistent_log/testdata/persistent_log_tables_schema.txt b/src/query/service/tests/it/persistent_log/testdata/persistent_log_tables_schema.txt new file mode 100644 index 0000000000000..bf93f748011ef --- /dev/null +++ b/src/query/service/tests/it/persistent_log/testdata/persistent_log_tables_schema.txt @@ -0,0 +1,66 @@ +query_details +log_type:Int8 NULL +log_type_name:String NULL +handler_type:String NULL +tenant_id:String NULL +cluster_id:String NULL +node_id:String NULL +sql_user:String NULL +sql_user_quota:String NULL +sql_user_privileges:String NULL +query_id:String NULL +query_kind:String NULL +query_text:String NULL +query_hash:String NULL +query_parameterized_hash:String NULL +event_date:Date NULL +event_time:Timestamp NULL +query_start_time:Timestamp NULL +query_duration_ms:Int64 NULL +query_queued_duration_ms:Int64 NULL +current_database:String NULL +written_rows:UInt64 NULL +written_bytes:UInt64 NULL +join_spilled_rows:UInt64 NULL +join_spilled_bytes:UInt64 NULL +agg_spilled_rows:UInt64 NULL +agg_spilled_bytes:UInt64 NULL +group_by_spilled_rows:UInt64 NULL +group_by_spilled_bytes:UInt64 NULL +written_io_bytes:UInt64 NULL +written_io_bytes_cost_ms:UInt64 NULL +scan_rows:UInt64 NULL +scan_bytes:UInt64 NULL +scan_io_bytes:UInt64 NULL +scan_io_bytes_cost_ms:UInt64 NULL +scan_partitions:UInt64 NULL +total_partitions:UInt64 NULL +result_rows:UInt64 NULL +result_bytes:UInt64 NULL +bytes_from_remote_disk:UInt64 NULL +bytes_from_local_disk:UInt64 NULL +bytes_from_memory:UInt64 NULL +client_address:String NULL +user_agent:String NULL +exception_code:Int32 NULL +exception_text:String NULL +server_version:String NULL +query_tag:String NULL +has_profile:Boolean NULL +peek_memory_usage:Variant NULL +query_profile +timestamp:Timestamp NULL +query_id:String NULL +profiles:Variant NULL +statistics_desc:Variant NULL +query_log +timestamp:Timestamp NULL +path:String NULL +target:String NULL +log_level:String NULL +cluster_id:String NULL +node_id:String NULL +warehouse_id:String NULL +query_id:String NULL +message:String NULL +fields:Variant NULL diff --git a/tests/logging/check_logs_table.sh b/tests/logging/check_logs_table.sh index 5e6e8f720ea87..ea7c53bb28995 100755 --- a/tests/logging/check_logs_table.sh +++ b/tests/logging/check_logs_table.sh @@ -2,49 +2,40 @@ set -e +function check_query_log() { + local count=$1 + local query_id=$2 + local check_query=$3 + local expected_result=$4 + + response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" \ + -H 'Content-Type: application/json' \ + -d "{\"sql\": \"$check_query query_id = '$query_id'\"}") + + result=$(echo $response | jq -r '.data[0][0]') + if [ "$result" != "$expected_result" ]; then + echo "Log table test #$count failed, Result: $result, Expected: $expected_result" + exit 1 + else + echo "Log table test #$count passed, Result: $result, Expected: $expected_result" + fi +} + +# Execute the initial query and get the query_id response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d '{"sql": "select 123"}') - query_id=$(echo $response | jq -r '.id') +echo "Query ID: $query_id" +# Wait for the query to be logged +sleep 15 -sleep 10 - -response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" \ - -H 'Content-Type: application/json' \ - -d "{\"sql\": \"SELECT COUNT(*) FROM persistent_system.query_log WHERE query_id = '$query_id'\"}") - -count=$(echo $response | jq -r '.data[0][0]') - -if [ $count != "0" ]; then - echo "Log table test1 passed" -else - echo "Log table test1 failed" - exit 1 -fi - -response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" \ - -H 'Content-Type: application/json' \ - -d "{\"sql\": \"SELECT check_json(message) FROM persistent_system.query_log WHERE target = 'databend::log::profile' and query_id = '$query_id'\"}") - -result=$(echo $response | jq -r '.data[0][0]') - -if [ result != "NULL" ]; then - echo "Log table test2 passed" -else - echo "Log table test2 failed" - exit 1 -fi - -response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" \ - -H 'Content-Type: application/json' \ - -d "{\"sql\": \"SELECT check_json(message) FROM persistent_system.query_log WHERE target = 'databend::log::query' and query_id = '$query_id'\"}") - -result=$(echo $response | jq -r '.data[0][0]') +# Test 1 +check_query_log "1" "$query_id" "SELECT count(*) FROM persistent_system.query_log WHERE target = 'databend::log::profile' and" "1" -if [ result != "NULL" ]; then - echo "Log table test3 passed" -else - echo "Log table test3 failed" - exit 1 -fi +# Test 2 +check_query_log "2" "$query_id" "SELECT count(*) FROM persistent_system.query_profile WHERE" "1" +# Test 3 +check_query_log "3" "$query_id" "SELECT count(*) FROM persistent_system.query_log WHERE target = 'databend::log::query' and" "2" +# Test 4 +check_query_log "4" "$query_id" "SELECT count(*) FROM persistent_system.query_details WHERE" "2"