Skip to content

Commit 1e72156

Browse files
authored
Merge pull request #7418 from dantengsky/feat-keep-last-snapshot-hint
feat: keep a hint of last snapshot location while committing new snapshot
2 parents 265e2c7 + d9540b0 commit 1e72156

File tree

13 files changed

+175
-108
lines changed

13 files changed

+175
-108
lines changed

src/query/service/tests/it/storages/fuse/operations/commit.rs

+40
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414
use common_base::base::tokio;
15+
use common_catalog::table_context::TableContext;
1516
use common_datablocks::DataBlock;
1617
use common_exception::Result;
18+
use common_storages_fuse::FuseTable;
1719
use futures::TryStreamExt;
1820

1921
use crate::storages::fuse::table_test_fixture::execute_query;
@@ -76,3 +78,41 @@ async fn test_fuse_occ_retry() -> Result<()> {
7678

7779
Ok(())
7880
}
81+
82+
#[tokio::test]
83+
async fn test_last_snapshot_hint() -> Result<()> {
84+
let fixture = TestFixture::new().await;
85+
let ctx = fixture.ctx();
86+
fixture.create_default_table().await?;
87+
88+
let table = fixture.latest_default_table().await?;
89+
90+
let num_blocks = 1;
91+
let rows_per_block = 1;
92+
let value_start_from = 1;
93+
let stream =
94+
TestFixture::gen_sample_blocks_stream_ex(num_blocks, rows_per_block, value_start_from);
95+
96+
let blocks = stream.try_collect().await?;
97+
fixture
98+
.append_commit_blocks(table.clone(), blocks, false, true)
99+
.await?;
100+
101+
// check last snapshot hit file
102+
let table = fixture.latest_default_table().await?;
103+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
104+
let last_snapshot_location = fuse_table.snapshot_loc().unwrap();
105+
let operator = ctx.get_storage_operator()?;
106+
let location = fuse_table
107+
.meta_location_generator()
108+
.gen_last_snapshot_hint_location();
109+
let storage_meta_data = operator.metadata();
110+
let storage_prefix = storage_meta_data.root();
111+
112+
let expected = format!("{}{}", storage_prefix, last_snapshot_location);
113+
let content = operator.object(location.as_str()).read().await?;
114+
115+
assert_eq!(content.as_slice(), expected.as_bytes());
116+
117+
Ok(())
118+
}

src/query/service/tests/it/storages/fuse/operations/mutation/deletion_mutator.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> {
110110
}
111111
}
112112

113-
let (new_snapshot, _) = mutator.into_new_snapshot().await?;
113+
let new_snapshot = mutator.into_new_snapshot().await?;
114114

115115
// half segments left after deletion
116116
assert_eq!(new_snapshot.segments.len(), 50);

src/query/storages/fuse-meta/src/meta/versions.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::meta::Versioned;
3030
// impl Versioned<0> for v0::SegmentInfo {}
3131
// impl Versioned<1> for v0::SegmentInfo {}
3232
//
33-
// Fortunately, since v0::SegmentInfo::VESION is used in
33+
// Fortunately, since v0::SegmentInfo::VERSION is used in
3434
// several places, compiler will report compile error if it
3535
// can not deduce a unique value the constant expression.
3636

src/query/storages/fuse/src/constants.rs

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";
2121
pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i";
2222
pub const FUSE_TBL_SEGMENT_PREFIX: &str = "_sg";
2323
pub const FUSE_TBL_SNAPSHOT_PREFIX: &str = "_ss";
24+
pub const FUSE_TBL_LAST_SNAPSHOT_HINT: &str = "last_snapshot_location_hint";
2425

2526
pub const DEFAULT_BLOCK_PER_SEGMENT: usize = 1000;
2627
pub const DEFAULT_BLOCK_SIZE_IN_MEM_SIZE_THRESHOLD: usize = 100 * 1024 * 1024;

src/query/storages/fuse/src/fuse_table.rs

+14-59
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,18 @@ use std::any::Any;
1616
use std::convert::TryFrom;
1717
use std::sync::Arc;
1818

19-
use common_cache::Cache;
2019
use common_catalog::catalog::StorageDescription;
2120
use common_catalog::table_context::TableContext;
2221
use common_catalog::table_mutator::TableMutator;
2322
use common_datablocks::DataBlock;
2423
use common_exception::ErrorCode;
2524
use common_exception::Result;
26-
use common_fuse_meta::caches::CacheManager;
2725
use common_fuse_meta::meta::ClusterKey;
2826
use common_fuse_meta::meta::Statistics as FuseStatistics;
2927
use common_fuse_meta::meta::TableSnapshot;
3028
use common_fuse_meta::meta::Versioned;
3129
use common_legacy_parser::ExpressionParser;
3230
use common_meta_app::schema::TableInfo;
33-
use common_meta_app::schema::TableMeta;
34-
use common_meta_app::schema::UpdateTableMetaReq;
35-
use common_meta_types::MatchSeq;
3631
use common_planners::DeletePlan;
3732
use common_planners::Expression;
3833
use common_planners::Extras;
@@ -43,7 +38,6 @@ use common_planners::TruncateTablePlan;
4338
use common_storages_util::storage_context::StorageContext;
4439
use uuid::Uuid;
4540

46-
use crate::io::write_meta;
4741
use crate::io::BlockCompactor;
4842
use crate::io::MetaReaders;
4943
use crate::io::TableMetaLocationGenerator;
@@ -174,53 +168,6 @@ impl FuseTable {
174168
}
175169
}
176170

177-
pub async fn update_table_meta(
178-
&self,
179-
ctx: &dyn TableContext,
180-
catalog_name: &str,
181-
snapshot: &TableSnapshot,
182-
meta: &mut TableMeta,
183-
) -> Result<()> {
184-
let uuid = snapshot.snapshot_id;
185-
let snapshot_loc = self
186-
.meta_location_generator()
187-
.snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?;
188-
let operator = ctx.get_storage_operator()?;
189-
write_meta(&operator, &snapshot_loc, snapshot).await?;
190-
191-
// set new snapshot location
192-
meta.options
193-
.insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), snapshot_loc.clone());
194-
// remove legacy options
195-
meta.options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC);
196-
197-
let table_id = self.table_info.ident.table_id;
198-
let table_version = self.table_info.ident.seq;
199-
let req = UpdateTableMetaReq {
200-
table_id,
201-
seq: MatchSeq::Exact(table_version),
202-
new_table_meta: meta.clone(),
203-
};
204-
205-
let catalog = ctx.get_catalog(catalog_name)?;
206-
let result = catalog.update_table_meta(req).await;
207-
match result {
208-
Ok(_) => {
209-
if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() {
210-
let cache = &mut snapshot_cache.write().await;
211-
cache.put(snapshot_loc, Arc::new(snapshot.clone()));
212-
}
213-
Ok(())
214-
}
215-
Err(e) => {
216-
// commit snapshot to meta server failed, try to delete it.
217-
// "major GC" will collect this, if deletion failure (even after DAL retried)
218-
let _ = operator.object(&snapshot_loc).delete().await;
219-
Err(e)
220-
}
221-
}
222-
}
223-
224171
pub fn transient(&self) -> bool {
225172
self.table_info.meta.options.contains_key("TRANSIENT")
226173
}
@@ -297,11 +244,15 @@ impl Table for FuseTable {
297244
cluster_key_meta,
298245
);
299246

300-
self.update_table_meta(
247+
let mut table_info = self.table_info.clone();
248+
table_info.meta = new_table_meta;
249+
250+
FuseTable::commit_to_meta_server(
301251
ctx.as_ref(),
302252
catalog_name,
303-
&new_snapshot,
304-
&mut new_table_meta,
253+
&table_info,
254+
&self.meta_location_generator,
255+
new_snapshot,
305256
)
306257
.await
307258
}
@@ -341,11 +292,15 @@ impl Table for FuseTable {
341292
None,
342293
);
343294

344-
self.update_table_meta(
295+
let mut table_info = self.table_info.clone();
296+
table_info.meta = new_table_meta;
297+
298+
FuseTable::commit_to_meta_server(
345299
ctx.as_ref(),
346300
catalog_name,
347-
&new_snapshot,
348-
&mut new_table_meta,
301+
&table_info,
302+
&self.meta_location_generator,
303+
new_snapshot,
349304
)
350305
.await
351306
}

src/query/storages/fuse/src/io/locations.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::constants::FUSE_TBL_BLOCK_PREFIX;
2727
use crate::constants::FUSE_TBL_SEGMENT_PREFIX;
2828
use crate::constants::FUSE_TBL_SNAPSHOT_PREFIX;
2929
use crate::FUSE_TBL_BLOCK_INDEX_PREFIX;
30+
use crate::FUSE_TBL_LAST_SNAPSHOT_HINT;
3031

3132
static SNAPSHOT_V0: SnapshotVersion = SnapshotVersion::V0(PhantomData);
3233
static SNAPSHOT_V1: SnapshotVersion = SnapshotVersion::V1(PhantomData);
@@ -75,7 +76,7 @@ impl TableMetaLocationGenerator {
7576
)
7677
}
7778

78-
pub fn gen_segment_info_location(&self) -> String where {
79+
pub fn gen_segment_info_location(&self) -> String {
7980
let segment_uuid = Uuid::new_v4().simple().to_string();
8081
format!(
8182
"{}/{}/{}_v{}.json",
@@ -98,6 +99,10 @@ impl TableMetaLocationGenerator {
9899
SNAPSHOT_V0.version()
99100
}
100101
}
102+
103+
pub fn gen_last_snapshot_hint_location(&self) -> String {
104+
format!("{}/{}", &self.prefix, FUSE_TBL_LAST_SNAPSHOT_HINT)
105+
}
101106
}
102107

103108
trait SnapshotLocationCreator {

src/query/storages/fuse/src/io/write/meta_writer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use tracing::warn;
2222
use crate::io::retry;
2323
use crate::io::retry::Retryable;
2424

25-
pub async fn write_meta<T>(data_accessor: &Operator, location: &str, meta: &T) -> Result<()>
25+
pub async fn write_meta<T>(data_accessor: &Operator, location: &str, meta: T) -> Result<()>
2626
where T: Serialize {
2727
let op = || async {
2828
let bs = serde_json::to_vec(&meta).map_err(Error::other)?;

0 commit comments

Comments
 (0)