Skip to content

feature(base): supports memory usage limits. #7290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ members = [
"src/common/hashtable",
"src/common/http",
"src/common/io",
"src/common/macros",
"src/common/metrics",
"src/common/tracing",
"src/common/storage",
Expand Down
1 change: 0 additions & 1 deletion src/binaries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ io-uring = [
common-base = { path = "../common/base" }
common-exception = { path = "../common/exception" }
common-grpc = { path = "../common/grpc" }
common-macros = { path = "../common/macros" }
common-meta-api = { path = "../meta/api" }
common-meta-app = { path = "../meta/app" }
common-meta-embedded = { path = "../meta/embedded" }
Expand Down
7 changes: 3 additions & 4 deletions src/binaries/meta/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ use std::env;
use std::ops::Deref;
use std::sync::Arc;

use common_base::base::RuntimeTracker;
use common_base::base::tokio;
use common_base::base::StopHandle;
use common_base::base::Stoppable;
use common_grpc::RpcClientConf;
use common_macros::databend_main;
use common_meta_sled_store::init_sled_db;
use common_meta_store::MetaStoreProvider;
use common_tracing::init_logging;
Expand All @@ -40,8 +39,8 @@ pub use kvapi::KvApiCommand;

const CMD_KVAPI_PREFIX: &str = "kvapi::";

#[databend_main]
async fn main(_global_tracker: Arc<RuntimeTracker>) -> common_exception::Result<()> {
#[tokio::main]
async fn main() -> common_exception::Result<()> {
let conf = Config::load()?;

if run_cmd(&conf).await {
Expand Down
21 changes: 15 additions & 6 deletions src/binaries/query/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use std::env;
use std::ops::Deref;
use std::sync::Arc;

use common_base::base::RuntimeTracker;
use common_macros::databend_main;
use common_base::base::{GlobalTracker, Runtime, ThreadTracker};
use common_meta_embedded::MetaEmbedded;
use common_meta_grpc::MIN_METASRV_SEMVER;
use common_metrics::init_default_metrics_recorder;
Expand All @@ -36,9 +35,19 @@ use databend_query::Config;
use databend_query::GlobalServices;
use databend_query::QUERY_SEMVER;
use tracing::info;
use common_base::base::tokio;

#[databend_main]
async fn main(_global_tracker: Arc<RuntimeTracker>) -> common_exception::Result<()> {
fn main() -> common_exception::Result<()> {
// init thread tracker, because block_on use this thread;
let global = GlobalTracker::create();
ThreadTracker::init(global, None);

return Runtime::with_default_worker_threads()
.unwrap()
.block_on(main_entrypoint());
}

async fn main_entrypoint() -> common_exception::Result<()> {
let conf: Config = Config::load()?;

if run_cmd(&conf) {
Expand Down Expand Up @@ -214,7 +223,7 @@ async fn main(_global_tracker: Arc<RuntimeTracker>) -> common_exception::Result<
"{}:{}",
conf.query.clickhouse_http_handler_host, conf.query.clickhouse_http_handler_port
)
.parse()?
.parse()?
)
);
println!("Databend HTTP");
Expand All @@ -229,7 +238,7 @@ async fn main(_global_tracker: Arc<RuntimeTracker>) -> common_exception::Result<
"{}:{}",
conf.query.http_handler_host, conf.query.http_handler_port
)
.parse()?
.parse()?
)
);

Expand Down
7 changes: 4 additions & 3 deletions src/common/base/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod net;
mod profiling;
mod progress;
mod runtime;
mod runtime_tracker;
mod thread_tracker;
mod shutdown_signal;
mod singleton_instance;
mod stop_handle;
Expand All @@ -35,8 +35,7 @@ pub use progress::ProgressValues;
pub use runtime::Dropper;
pub use runtime::Runtime;
pub use runtime::TrySpawn;
pub use runtime_tracker::RuntimeTracker;
pub use runtime_tracker::ThreadTracker;
pub use thread_tracker::ThreadTracker;
pub use shutdown_signal::signal_stream;
pub use shutdown_signal::DummySignalStream;
pub use shutdown_signal::SignalStream;
Expand All @@ -54,3 +53,5 @@ pub use tokio;
pub use uniq_id::GlobalSequence;
pub use uniq_id::GlobalUniqName;
pub use uuid;
pub use thread_tracker::GlobalTracker;
pub use thread_tracker::QueryTracker;
64 changes: 29 additions & 35 deletions src/common/base/src/base/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::alloc::{GlobalAlloc, Layout};
use std::future::Future;
use std::sync::Arc;
use std::thread;
Expand All @@ -21,8 +22,8 @@ use common_exception::Result;
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

use super::runtime_tracker::RuntimeTracker;
use crate::base::{GlobalTracker, QueryTracker, ThreadTracker};
use crate::mem_allocator::ALLOC;

/// Methods to spawn tasks.
pub trait TrySpawn {
Expand All @@ -31,18 +32,18 @@ pub trait TrySpawn {
/// It allows to return an error before spawning the task.
#[track_caller]
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
where
T: Future + Send + 'static,
T::Output: Send + 'static;
where
T: Future + Send + 'static,
T::Output: Send + 'static;

/// Spawns a new asynchronous task, returning a tokio::JoinHandle for it.
///
/// A default impl of this method just calls `try_spawn` and just panics if there is an error.
#[track_caller]
fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
self.try_spawn(task).unwrap()
}
Expand All @@ -51,18 +52,18 @@ pub trait TrySpawn {
impl<S: TrySpawn> TrySpawn for Arc<S> {
#[track_caller]
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
self.as_ref().try_spawn(task)
}

#[track_caller]
fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
self.as_ref().spawn(task)
}
Expand All @@ -73,14 +74,12 @@ impl<S: TrySpawn> TrySpawn for Arc<S> {
pub struct Runtime {
// Handle to runtime.
handle: Handle,
// Runtime tracker
tracker: Arc<RuntimeTracker>,
// Use to receive a drop signal when dropper is dropped.
_dropper: Dropper,
}

impl Runtime {
fn create(tracker: Arc<RuntimeTracker>, builder: &mut tokio::runtime::Builder) -> Result<Self> {
fn create(builder: &mut tokio::runtime::Builder) -> Result<Self> {
let runtime = builder
.build()
.map_err(|tokio_error| ErrorCode::TokioError(tokio_error.to_string()))?;
Expand All @@ -94,33 +93,29 @@ impl Runtime {

Ok(Runtime {
handle,
tracker,
_dropper: Dropper {
close: Some(send_stop),
},
})
}

fn tracker_builder(rt_tracker: Arc<RuntimeTracker>) -> tokio::runtime::Builder {
let mut builder = tokio::runtime::Builder::new_multi_thread();
fn init_tracker(mut builder: tokio::runtime::Builder) -> tokio::runtime::Builder {
let global = GlobalTracker::current();
let query = QueryTracker::current();

builder
.enable_all()
.on_thread_stop(rt_tracker.on_stop_thread())
.on_thread_start(rt_tracker.on_start_thread());
.on_thread_stop(|| { ThreadTracker::destroy(); })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better understanding, how about:

.on_thread_start(xxx)
.on_thread_stop(xxx)

.on_thread_start(move || { ThreadTracker::init(global.clone(), query.clone()); });

builder
}

pub fn get_tracker(&self) -> Arc<RuntimeTracker> {
self.tracker.clone()
}

/// Spawns a new tokio runtime with a default thread count on a background
/// thread and returns a `Handle` which can be used to spawn tasks via
/// its executor.
pub fn with_default_worker_threads() -> Result<Self> {
let tracker = RuntimeTracker::create();
let mut runtime_builder = Self::tracker_builder(tracker.clone());
let mut runtime_builder = Self::init_tracker(tokio::runtime::Builder::new_multi_thread());

#[cfg(debug_assertions)]
{
Expand All @@ -132,13 +127,12 @@ impl Runtime {
}
}

Self::create(tracker, &mut runtime_builder)
Self::create(&mut runtime_builder)
}

#[allow(unused_mut)]
pub fn with_worker_threads(workers: usize, mut thread_name: Option<String>) -> Result<Self> {
let tracker = RuntimeTracker::create();
let mut runtime_builder = Self::tracker_builder(tracker.clone());
let mut runtime_builder = Self::init_tracker(tokio::runtime::Builder::new_multi_thread());

#[cfg(debug_assertions)]
{
Expand All @@ -154,7 +148,7 @@ impl Runtime {
runtime_builder.thread_name(thread_name);
}

Self::create(tracker, runtime_builder.worker_threads(workers))
Self::create(runtime_builder.worker_threads(workers))
}

pub fn inner(&self) -> tokio::runtime::Handle {
Expand All @@ -169,9 +163,9 @@ impl Runtime {
impl TrySpawn for Runtime {
#[track_caller]
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
Ok(self.handle.spawn(task))
}
Expand Down
Loading