Skip to content

Commit 5013b5e

Browse files
authored
feat(ctl): support pause/resume hummock version checkpoint (#10069)
1 parent f15c315 commit 5013b5e

File tree

10 files changed

+207
-36
lines changed

10 files changed

+207
-36
lines changed

proto/hummock.proto

+17
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,20 @@ message SplitCompactionGroupResponse {
579579
uint64 new_group_id = 1;
580580
}
581581

582+
message RiseCtlPauseVersionCheckpointRequest {}
583+
584+
message RiseCtlPauseVersionCheckpointResponse {}
585+
586+
message RiseCtlResumeVersionCheckpointRequest {}
587+
588+
message RiseCtlResumeVersionCheckpointResponse {}
589+
590+
message RiseCtlGetCheckpointVersionRequest {}
591+
592+
message RiseCtlGetCheckpointVersionResponse {
593+
HummockVersion checkpoint_version = 1;
594+
}
595+
582596
service HummockManagerService {
583597
rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse);
584598
rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse);
@@ -604,6 +618,9 @@ service HummockManagerService {
604618
rpc RiseCtlGetPinnedSnapshotsSummary(RiseCtlGetPinnedSnapshotsSummaryRequest) returns (RiseCtlGetPinnedSnapshotsSummaryResponse);
605619
rpc RiseCtlListCompactionGroup(RiseCtlListCompactionGroupRequest) returns (RiseCtlListCompactionGroupResponse);
606620
rpc RiseCtlUpdateCompactionConfig(RiseCtlUpdateCompactionConfigRequest) returns (RiseCtlUpdateCompactionConfigResponse);
621+
rpc RiseCtlPauseVersionCheckpoint(RiseCtlPauseVersionCheckpointRequest) returns (RiseCtlPauseVersionCheckpointResponse);
622+
rpc RiseCtlResumeVersionCheckpoint(RiseCtlResumeVersionCheckpointRequest) returns (RiseCtlResumeVersionCheckpointResponse);
623+
rpc RiseCtlGetCheckpointVersion(RiseCtlGetCheckpointVersionRequest) returns (RiseCtlGetCheckpointVersionResponse);
607624
rpc InitMetadataForReplay(InitMetadataForReplayRequest) returns (InitMetadataForReplayResponse);
608625
rpc GetScaleCompactor(GetScaleCompactorRequest) returns (GetScaleCompactorResponse);
609626
rpc PinVersion(PinVersionRequest) returns (PinVersionResponse);

src/ctl/src/cmd_impl/hummock.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ pub use list_kv::*;
1919
mod sst_dump;
2020
pub use sst_dump::*;
2121
mod compaction_group;
22-
mod disable_commit_epoch;
2322
mod list_version_deltas;
23+
mod pause_resume;
2424
mod trigger_full_gc;
2525
mod trigger_manual_compaction;
2626

2727
pub use compaction_group::*;
28-
pub use disable_commit_epoch::*;
2928
pub use list_version_deltas::*;
29+
pub use pause_resume::*;
3030
pub use trigger_full_gc::*;
3131
pub use trigger_manual_compaction::*;

src/ctl/src/cmd_impl/hummock/disable_commit_epoch.rs

-26
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright 2023 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
16+
use risingwave_hummock_sdk::HummockEpoch;
17+
18+
use crate::CtlContext;
19+
20+
pub async fn disable_commit_epoch(context: &CtlContext) -> anyhow::Result<()> {
21+
let meta_client = context.meta_client().await?;
22+
let version = meta_client.disable_commit_epoch().await?;
23+
println!(
24+
"Disabled.\
25+
Current version: id {}, max_committed_epoch {}",
26+
version.id, version.max_committed_epoch
27+
);
28+
Ok(())
29+
}
30+
31+
pub async fn pause_version_checkpoint(context: &CtlContext) -> anyhow::Result<()> {
32+
let meta_client = context.meta_client().await?;
33+
meta_client
34+
.risectl_pause_hummock_version_checkpoint()
35+
.await?;
36+
println!("Hummock version checkpoint is paused");
37+
Ok(())
38+
}
39+
40+
pub async fn resume_version_checkpoint(context: &CtlContext) -> anyhow::Result<()> {
41+
let meta_client = context.meta_client().await?;
42+
meta_client
43+
.risectl_resume_hummock_version_checkpoint()
44+
.await?;
45+
println!("Hummock version checkpoint is resumed");
46+
Ok(())
47+
}
48+
49+
/// For now this function itself doesn't provide useful info.
50+
/// We can extend it to reveal interested info, e.g. at which hummock version is a user key
51+
/// added/removed for what reason (row deletion/compaction/etc.).
52+
pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> {
53+
let meta_client = context.meta_client().await?;
54+
let mut base_version = meta_client
55+
.risectl_get_checkpoint_hummock_version()
56+
.await?
57+
.checkpoint_version
58+
.unwrap();
59+
println!("replay starts");
60+
println!("base version {}", base_version.id);
61+
let delta_fetch_size = 100;
62+
let mut current_delta_id = base_version.id + 1;
63+
loop {
64+
let deltas = meta_client
65+
.list_version_deltas(current_delta_id, delta_fetch_size, HummockEpoch::MAX)
66+
.await
67+
.unwrap();
68+
if deltas.version_deltas.is_empty() {
69+
break;
70+
}
71+
for delta in deltas.version_deltas {
72+
if delta.prev_id != base_version.id {
73+
eprintln!("missing delta log for version {}", base_version.id);
74+
break;
75+
}
76+
base_version.apply_version_delta(&delta);
77+
println!("replayed version {}", base_version.id);
78+
}
79+
current_delta_id = base_version.id + 1;
80+
}
81+
println!("replay ends");
82+
Ok(())
83+
}

src/ctl/src/lib.rs

+15
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@ enum HummockCommands {
158158
#[clap(long)]
159159
table_ids: Vec<u32>,
160160
},
161+
/// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
162+
PauseVersionCheckpoint,
163+
/// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
164+
ResumeVersionCheckpoint,
165+
/// Replay version from the checkpoint one to the latest one.
166+
ReplayVersion,
161167
}
162168

163169
#[derive(Subcommand)]
@@ -319,6 +325,15 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
319325
cmd_impl::hummock::split_compaction_group(context, compaction_group_id, &table_ids)
320326
.await?;
321327
}
328+
Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
329+
cmd_impl::hummock::pause_version_checkpoint(context).await?;
330+
}
331+
Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
332+
cmd_impl::hummock::resume_version_checkpoint(context).await?;
333+
}
334+
Commands::Hummock(HummockCommands::ReplayVersion) => {
335+
cmd_impl::hummock::replay_version(context).await?;
336+
}
322337
Commands::Table(TableCommands::Scan { mv_name, data_dir }) => {
323338
cmd_impl::table::scan(context, mv_name, data_dir).await?
324339
}

src/meta/src/hummock/manager/checkpoint.rs

+28-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::ops::Bound::{Excluded, Included};
1616
use std::ops::{Deref, DerefMut};
17+
use std::sync::atomic::Ordering;
1718

1819
use function_name::named;
1920
use itertools::Itertools;
@@ -22,7 +23,7 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
2223
};
2324
use risingwave_hummock_sdk::version_checkpoint_dir;
2425
use risingwave_pb::hummock::hummock_version_checkpoint::StaleObjects;
25-
use risingwave_pb::hummock::HummockVersionCheckpoint;
26+
use risingwave_pb::hummock::{HummockVersion, HummockVersionCheckpoint};
2627

2728
use crate::hummock::error::Result;
2829
use crate::hummock::manager::{read_lock, write_lock};
@@ -175,4 +176,30 @@ where
175176
.await
176177
.map_err(Into::into)
177178
}
179+
180+
pub(crate) fn pause_version_checkpoint(&self) {
181+
self.pause_version_checkpoint.store(true, Ordering::Relaxed);
182+
tracing::info!("hummock version checkpoint is paused.");
183+
}
184+
185+
pub(crate) fn resume_version_checkpoint(&self) {
186+
self.pause_version_checkpoint
187+
.store(false, Ordering::Relaxed);
188+
tracing::info!("hummock version checkpoint is resumed.");
189+
}
190+
191+
pub(crate) fn is_version_checkpoint_paused(&self) -> bool {
192+
self.pause_version_checkpoint.load(Ordering::Relaxed)
193+
}
194+
195+
#[named]
196+
pub(crate) async fn get_checkpoint_version(&self) -> HummockVersion {
197+
let versioning_guard = read_lock!(self, versioning).await;
198+
versioning_guard
199+
.checkpoint
200+
.version
201+
.as_ref()
202+
.unwrap()
203+
.clone()
204+
}
178205
}

src/meta/src/hummock/manager/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use core::panic;
1616
use std::borrow::BorrowMut;
1717
use std::collections::{BTreeMap, HashMap, HashSet};
1818
use std::ops::{Deref, DerefMut};
19+
use std::sync::atomic::AtomicBool;
1920
use std::sync::{Arc, LazyLock};
2021
use std::time::{Duration, Instant};
2122

@@ -119,6 +120,7 @@ pub struct HummockManager<S: MetaStore> {
119120

120121
object_store: ObjectStoreRef,
121122
version_checkpoint_path: String,
123+
pause_version_checkpoint: AtomicBool,
122124
}
123125

124126
pub type HummockManagerRef<S> = Arc<HummockManager<S>>;
@@ -342,6 +344,7 @@ where
342344
event_sender: tx,
343345
object_store,
344346
version_checkpoint_path: checkpoint_path,
347+
pause_version_checkpoint: AtomicBool::new(false),
345348
};
346349
let instance = Arc::new(instance);
347350
instance.start_worker(rx).await;

src/meta/src/hummock/mod.rs

+9-7
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,10 @@ where
6464
meta_opts.min_delta_log_num_for_hummock_version_checkpoint,
6565
),
6666
];
67-
// Start vacuum in non-deterministic compaction test
68-
if !meta_opts.compaction_deterministic_test {
69-
workers.push(start_vacuum_scheduler(
70-
vacuum_manager,
71-
Duration::from_secs(meta_opts.vacuum_interval_sec),
72-
));
73-
}
67+
workers.push(start_vacuum_scheduler(
68+
vacuum_manager,
69+
Duration::from_secs(meta_opts.vacuum_interval_sec),
70+
));
7471
workers
7572
}
7673

@@ -145,6 +142,11 @@ pub fn start_checkpoint_loop<S: MetaStore>(
145142
return;
146143
}
147144
}
145+
if hummock_manager.is_version_checkpoint_paused()
146+
|| hummock_manager.env.opts.compaction_deterministic_test
147+
{
148+
continue;
149+
}
148150
if let Err(err) = hummock_manager
149151
.create_version_checkpoint(min_delta_log_num)
150152
.await

src/meta/src/rpc/service/hummock_service.rs

+26
Original file line numberDiff line numberDiff line change
@@ -515,4 +515,30 @@ where
515515
resp.suggest_cores = scale_out_cores;
516516
Ok(Response::new(resp))
517517
}
518+
519+
async fn rise_ctl_pause_version_checkpoint(
520+
&self,
521+
_request: Request<RiseCtlPauseVersionCheckpointRequest>,
522+
) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
523+
self.hummock_manager.pause_version_checkpoint();
524+
Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
525+
}
526+
527+
async fn rise_ctl_resume_version_checkpoint(
528+
&self,
529+
_request: Request<RiseCtlResumeVersionCheckpointRequest>,
530+
) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
531+
self.hummock_manager.resume_version_checkpoint();
532+
Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
533+
}
534+
535+
async fn rise_ctl_get_checkpoint_version(
536+
&self,
537+
_request: Request<RiseCtlGetCheckpointVersionRequest>,
538+
) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
539+
let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
540+
Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
541+
checkpoint_version: Some(checkpoint_version),
542+
}))
543+
}
518544
}

src/rpc_client/src/meta_client.rs

+24
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,27 @@ impl MetaClient {
691691
.await
692692
}
693693

694+
pub async fn risectl_get_checkpoint_hummock_version(
695+
&self,
696+
) -> Result<RiseCtlGetCheckpointVersionResponse> {
697+
let request = RiseCtlGetCheckpointVersionRequest {};
698+
self.inner.rise_ctl_get_checkpoint_version(request).await
699+
}
700+
701+
pub async fn risectl_pause_hummock_version_checkpoint(
702+
&self,
703+
) -> Result<RiseCtlPauseVersionCheckpointResponse> {
704+
let request = RiseCtlPauseVersionCheckpointRequest {};
705+
self.inner.rise_ctl_pause_version_checkpoint(request).await
706+
}
707+
708+
pub async fn risectl_resume_hummock_version_checkpoint(
709+
&self,
710+
) -> Result<RiseCtlResumeVersionCheckpointResponse> {
711+
let request = RiseCtlResumeVersionCheckpointRequest {};
712+
self.inner.rise_ctl_resume_version_checkpoint(request).await
713+
}
714+
694715
pub async fn init_metadata_for_replay(
695716
&self,
696717
tables: Vec<PbTable>,
@@ -1466,6 +1487,9 @@ macro_rules! for_all_meta_rpc {
14661487
,{ hummock_client, rise_ctl_get_pinned_snapshots_summary, RiseCtlGetPinnedSnapshotsSummaryRequest, RiseCtlGetPinnedSnapshotsSummaryResponse }
14671488
,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse }
14681489
,{ hummock_client, rise_ctl_update_compaction_config, RiseCtlUpdateCompactionConfigRequest, RiseCtlUpdateCompactionConfigResponse }
1490+
,{ hummock_client, rise_ctl_get_checkpoint_version, RiseCtlGetCheckpointVersionRequest, RiseCtlGetCheckpointVersionResponse }
1491+
,{ hummock_client, rise_ctl_pause_version_checkpoint, RiseCtlPauseVersionCheckpointRequest, RiseCtlPauseVersionCheckpointResponse }
1492+
,{ hummock_client, rise_ctl_resume_version_checkpoint, RiseCtlResumeVersionCheckpointRequest, RiseCtlResumeVersionCheckpointResponse }
14691493
,{ hummock_client, init_metadata_for_replay, InitMetadataForReplayRequest, InitMetadataForReplayResponse }
14701494
,{ hummock_client, split_compaction_group, SplitCompactionGroupRequest, SplitCompactionGroupResponse }
14711495
,{ user_client, create_user, CreateUserRequest, CreateUserResponse }

0 commit comments

Comments
 (0)