Skip to content

Commit 9740b81

Browse files
committed
refactor: unify table commit path
1 parent 1741c7b commit 9740b81

File tree

6 files changed

+135
-107
lines changed

6 files changed

+135
-107
lines changed

src/query/storages/fuse/src/fuse_table.rs

+72-61
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,18 @@ use std::any::Any;
1616
use std::convert::TryFrom;
1717
use std::sync::Arc;
1818

19-
use common_cache::Cache;
2019
use common_catalog::catalog::StorageDescription;
2120
use common_catalog::table_context::TableContext;
2221
use common_catalog::table_mutator::TableMutator;
2322
use common_datablocks::DataBlock;
2423
use common_exception::ErrorCode;
2524
use common_exception::Result;
26-
use common_fuse_meta::caches::CacheManager;
2725
use common_fuse_meta::meta::ClusterKey;
2826
use common_fuse_meta::meta::Statistics as FuseStatistics;
2927
use common_fuse_meta::meta::TableSnapshot;
3028
use common_fuse_meta::meta::Versioned;
3129
use common_legacy_parser::ExpressionParser;
3230
use common_meta_app::schema::TableInfo;
33-
use common_meta_app::schema::TableMeta;
34-
use common_meta_app::schema::UpdateTableMetaReq;
35-
use common_meta_types::MatchSeq;
3631
use common_planners::DeletePlan;
3732
use common_planners::Expression;
3833
use common_planners::Extras;
@@ -43,7 +38,6 @@ use common_planners::TruncateTablePlan;
4338
use common_storages_util::storage_context::StorageContext;
4439
use uuid::Uuid;
4540

46-
use crate::io::write_meta;
4741
use crate::io::BlockCompactor;
4842
use crate::io::MetaReaders;
4943
use crate::io::TableMetaLocationGenerator;
@@ -174,52 +168,52 @@ impl FuseTable {
174168
}
175169
}
176170

177-
pub async fn update_table_meta(
178-
&self,
179-
ctx: &dyn TableContext,
180-
catalog_name: &str,
181-
snapshot: &TableSnapshot,
182-
meta: &mut TableMeta,
183-
) -> Result<()> {
184-
let uuid = snapshot.snapshot_id;
185-
let snapshot_loc = self
186-
.meta_location_generator()
187-
.snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?;
188-
let operator = ctx.get_storage_operator()?;
189-
write_meta(&operator, &snapshot_loc, snapshot).await?;
190-
191-
// set new snapshot location
192-
meta.options
193-
.insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), snapshot_loc.clone());
194-
// remove legacy options
195-
meta.options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC);
196-
197-
let table_id = self.table_info.ident.table_id;
198-
let table_version = self.table_info.ident.seq;
199-
let req = UpdateTableMetaReq {
200-
table_id,
201-
seq: MatchSeq::Exact(table_version),
202-
new_table_meta: meta.clone(),
203-
};
204-
205-
let catalog = ctx.get_catalog(catalog_name)?;
206-
let result = catalog.update_table_meta(req).await;
207-
match result {
208-
Ok(_) => {
209-
if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() {
210-
let cache = &mut snapshot_cache.write().await;
211-
cache.put(snapshot_loc, Arc::new(snapshot.clone()));
212-
}
213-
Ok(())
214-
}
215-
Err(e) => {
216-
// commit snapshot to meta server failed, try to delete it.
217-
// "major GC" will collect this, if deletion failure (even after DAL retried)
218-
let _ = operator.object(&snapshot_loc).delete().await;
219-
Err(e)
220-
}
221-
}
222-
}
171+
// pub async fn update_table_meta(
172+
// &self,
173+
// ctx: &dyn TableContext,
174+
// catalog_name: &str,
175+
// snapshot: &TableSnapshot,
176+
// meta: &mut TableMeta,
177+
// ) -> Result<()> {
178+
// let uuid = snapshot.snapshot_id;
179+
// let snapshot_loc = self
180+
// .meta_location_generator()
181+
// .snapshot_location_from_uuid(&uuid, TableSnapshot::VERSION)?;
182+
// let operator = ctx.get_storage_operator()?;
183+
// write_meta(&operator, &snapshot_loc, snapshot).await?;
184+
//
185+
// // set new snapshot location
186+
// meta.options
187+
// .insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), snapshot_loc.clone());
188+
// // remove legacy options
189+
// meta.options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC);
190+
//
191+
// let table_id = self.table_info.ident.table_id;
192+
// let table_version = self.table_info.ident.seq;
193+
// let req = UpdateTableMetaReq {
194+
// table_id,
195+
// seq: MatchSeq::Exact(table_version),
196+
// new_table_meta: meta.clone(),
197+
// };
198+
//
199+
// let catalog = ctx.get_catalog(catalog_name)?;
200+
// let result = catalog.update_table_meta(req).await;
201+
// match result {
202+
// Ok(_) => {
203+
// if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() {
204+
// let cache = &mut snapshot_cache.write().await;
205+
// cache.put(snapshot_loc, Arc::new(snapshot.clone()));
206+
// }
207+
// Ok(())
208+
// }
209+
// Err(e) => {
210+
// // commit snapshot to meta server failed, try to delete it.
211+
// // "major GC" will collect this, if deletion failure (even after DAL retried)
212+
// let _ = operator.object(&snapshot_loc).delete().await;
213+
// Err(e)
214+
// }
215+
// }
216+
// }
223217

224218
pub fn transient(&self) -> bool {
225219
self.table_info.meta.options.contains_key("TRANSIENT")
@@ -287,8 +281,9 @@ impl Table for FuseTable {
287281
(FuseStatistics::default(), vec![])
288282
};
289283

284+
let new_snapshot_id = Uuid::new_v4();
290285
let new_snapshot = TableSnapshot::new(
291-
Uuid::new_v4(),
286+
new_snapshot_id,
292287
&prev_timestamp,
293288
prev_snapshot_id,
294289
schema,
@@ -297,13 +292,21 @@ impl Table for FuseTable {
297292
cluster_key_meta,
298293
);
299294

300-
self.update_table_meta(
295+
// write down the new snapshot
296+
let snapshot_loc = self.meta_location_generator.snapshot_location_from_uuid(
297+
&new_snapshot.snapshot_id,
298+
new_snapshot.format_version(),
299+
)?;
300+
301+
FuseTable::commit_to_meta_server(
301302
ctx.as_ref(),
302303
catalog_name,
303-
&new_snapshot,
304-
&mut new_table_meta,
304+
&self.table_info,
305+
snapshot_loc,
306+
new_snapshot,
305307
)
306-
.await
308+
.await?;
309+
Ok(())
307310
}
308311

309312
async fn drop_table_cluster_keys(
@@ -331,8 +334,9 @@ impl Table for FuseTable {
331334
(FuseStatistics::default(), vec![])
332335
};
333336

337+
let new_snapshot_id = Uuid::new_v4();
334338
let new_snapshot = TableSnapshot::new(
335-
Uuid::new_v4(),
339+
new_snapshot_id,
336340
&prev_timestamp,
337341
prev_snapshot_id,
338342
schema,
@@ -341,11 +345,18 @@ impl Table for FuseTable {
341345
None,
342346
);
343347

344-
self.update_table_meta(
348+
// write down the new snapshot
349+
let snapshot_loc = self.meta_location_generator.snapshot_location_from_uuid(
350+
&new_snapshot.snapshot_id,
351+
new_snapshot.format_version(),
352+
)?;
353+
354+
FuseTable::commit_to_meta_server(
345355
ctx.as_ref(),
346356
catalog_name,
347-
&new_snapshot,
348-
&mut new_table_meta,
357+
&self.table_info,
358+
snapshot_loc,
359+
new_snapshot,
349360
)
350361
.await
351362
}

src/query/storages/fuse/src/operations/commit.rs

+57-23
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ use std::time::Instant;
1919
use backoff::backoff::Backoff;
2020
use backoff::ExponentialBackoffBuilder;
2121
use common_base::base::ProgressValues;
22+
use common_cache::Cache;
2223
use common_catalog::table::Table;
2324
use common_catalog::table_context::TableContext;
2425
use common_datavalues::DataSchema;
2526
use common_exception::ErrorCode;
2627
use common_exception::Result;
28+
use common_fuse_meta::caches::CacheManager;
2729
use common_fuse_meta::meta::ClusterKey;
2830
use common_fuse_meta::meta::Location;
2931
use common_fuse_meta::meta::SegmentInfo;
@@ -32,14 +34,14 @@ use common_fuse_meta::meta::TableSnapshot;
3234
use common_fuse_meta::meta::Versioned;
3335
use common_meta_app::schema::TableInfo;
3436
use common_meta_app::schema::TableStatistics;
35-
use common_meta_app::schema::UpdateTableMetaReply;
3637
use common_meta_app::schema::UpdateTableMetaReq;
3738
use common_meta_types::MatchSeq;
3839
use tracing::debug;
3940
use tracing::info;
4041
use tracing::warn;
4142
use uuid::Uuid;
4243

44+
use crate::io::write_meta;
4345
use crate::operations::AppendOperationLogEntry;
4446
use crate::operations::TableOperationLog;
4547
use crate::statistics;
@@ -177,9 +179,11 @@ impl FuseTable {
177179
.into_iter()
178180
.map(|loc| (loc, SegmentInfo::VERSION))
179181
.collect();
182+
183+
let new_snapshot_id = Uuid::new_v4();
180184
let new_snapshot = if overwrite {
181185
TableSnapshot::new(
182-
Uuid::new_v4(),
186+
new_snapshot_id,
183187
&prev_timestamp,
184188
prev.as_ref().map(|v| (v.snapshot_id, prev_version)),
185189
schema,
@@ -207,11 +211,18 @@ impl FuseTable {
207211
index_data_bytes: new_snapshot.summary.index_size,
208212
};
209213

210-
self.update_table_meta(
214+
// write down the new snapshot
215+
let snapshot_loc = self.meta_location_generator.snapshot_location_from_uuid(
216+
&new_snapshot.snapshot_id,
217+
new_snapshot.format_version(),
218+
)?;
219+
220+
FuseTable::commit_to_meta_server(
211221
ctx.as_ref(),
212222
catalog_name,
213-
&new_snapshot,
214-
&mut new_table_meta,
223+
&self.table_info,
224+
snapshot_loc,
225+
new_snapshot,
215226
)
216227
.await
217228
}
@@ -256,24 +267,25 @@ impl FuseTable {
256267
ctx: &dyn TableContext,
257268
catalog_name: &str,
258269
table_info: &TableInfo,
259-
new_snapshot_location: String,
260-
stats: &Statistics,
261-
) -> Result<UpdateTableMetaReply> {
262-
let catalog = ctx.get_catalog(catalog_name)?;
263-
264-
let table_id = table_info.ident.table_id;
265-
let table_version = table_info.ident.seq;
270+
snapshot_location: String,
271+
snapshot: TableSnapshot,
272+
) -> Result<()> {
273+
// 1. write down snapshot
274+
let operator = ctx.get_storage_operator()?;
275+
write_meta(&operator, &snapshot_location, &snapshot).await?;
266276

277+
// 2. prepare table meta
267278
let mut new_table_meta = table_info.meta.clone();
268-
269-
// set new snapshot location
270-
new_table_meta
271-
.options
272-
.insert(OPT_KEY_SNAPSHOT_LOCATION.to_owned(), new_snapshot_location);
273-
279+
// 2.1 set new snapshot location
280+
new_table_meta.options.insert(
281+
OPT_KEY_SNAPSHOT_LOCATION.to_owned(),
282+
snapshot_location.clone(),
283+
);
274284
// remove legacy options
275285
self::utils::remove_legacy_options(&mut new_table_meta.options);
276286

287+
// 2.2 setup table statistics
288+
let stats = &snapshot.summary;
277289
// update statistics
278290
new_table_meta.statistics = TableStatistics {
279291
number_of_rows: stats.row_count,
@@ -282,13 +294,35 @@ impl FuseTable {
282294
index_data_bytes: stats.index_size,
283295
};
284296

297+
// 3. prepare the request
298+
299+
let catalog = ctx.get_catalog(catalog_name)?;
300+
let table_id = table_info.ident.table_id;
301+
let table_version = table_info.ident.seq;
302+
285303
let req = UpdateTableMetaReq {
286304
table_id,
287305
seq: MatchSeq::Exact(table_version),
288306
new_table_meta,
289307
};
290308

291-
catalog.update_table_meta(req).await
309+
// 3. let's roll
310+
let reply = catalog.update_table_meta(req).await;
311+
match reply {
312+
Ok(_) => {
313+
if let Some(snapshot_cache) = CacheManager::instance().get_table_snapshot_cache() {
314+
let cache = &mut snapshot_cache.write().await;
315+
cache.put(snapshot_location, Arc::new(snapshot));
316+
}
317+
Ok(())
318+
}
319+
Err(e) => {
320+
// commit snapshot to meta server failed, try to delete it.
321+
// "major GC" will collect this, if deletion failure (even after DAL retried)
322+
let _ = operator.object(&snapshot_location).delete().await;
323+
Err(e)
324+
}
325+
}
292326
}
293327

294328
pub fn merge_append_operations(
@@ -334,10 +368,10 @@ impl FuseTable {
334368
catalog.get_table_by_info(&table_info)
335369
}
336370

337-
/// Left a hint file which indicates the location of the latest snapshot
338-
async fn write_last_snapshot_hint(&self, last_snapshot_path: &str) {
339-
let content = last_snapshot_path.to_owned();
340-
}
371+
// Left a hint file which indicates the location of the latest snapshot
372+
// async fn write_last_snapshot_hint(_ctx: &dyn TableContext, _last_snapshot_path: String) {
373+
// todo!()
374+
//}
341375
}
342376

343377
mod utils {

src/query/storages/fuse/src/operations/delete.rs

+2-8
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,8 @@ impl FuseTable {
126126
catalog_name: &str,
127127
) -> Result<()> {
128128
let (new_snapshot, loc) = del_holder.into_new_snapshot().await?;
129-
Self::commit_to_meta_server(
130-
ctx,
131-
catalog_name,
132-
self.get_table_info(),
133-
loc,
134-
&new_snapshot.summary,
135-
)
136-
.await?;
129+
Self::commit_to_meta_server(ctx, catalog_name, self.get_table_info(), loc, new_snapshot)
130+
.await?;
137131
// TODO check if error is recoverable, and try to resolve the conflict
138132
Ok(())
139133
}

src/query/storages/fuse/src/operations/mutation/base_mutator.rs

-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use common_fuse_meta::meta::Statistics;
2626
use common_fuse_meta::meta::TableSnapshot;
2727
use opendal::Operator;
2828

29-
use crate::io::write_meta;
3029
use crate::io::MetaReaders;
3130
use crate::io::SegmentWriter;
3231
use crate::io::TableMetaLocationGenerator;
@@ -96,7 +95,6 @@ impl BaseMutator {
9695
&new_snapshot.snapshot_id,
9796
new_snapshot.format_version(),
9897
)?;
99-
write_meta(&self.data_accessor, &snapshot_loc, &new_snapshot).await?;
10098
Ok((new_snapshot, snapshot_loc))
10199
}
102100

0 commit comments

Comments
 (0)