Skip to content

Commit 0301c58

Browse files
committed
fix review comment
1 parent ed88de3 commit 0301c58

File tree

5 files changed

+18
-16
lines changed

5 files changed

+18
-16
lines changed

src/query/storages/fuse/src/operations/commit.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ use tracing::warn;
4646
use uuid::Uuid;
4747

4848
use crate::io::write_meta;
49+
use crate::io::SegmentsIO;
4950
use crate::io::TableMetaLocationGenerator;
5051
use crate::operations::AppendOperationLogEntry;
5152
use crate::operations::TableOperationLog;
5253
use crate::statistics;
5354
use crate::statistics::merge_statistics;
54-
use crate::FuseSegmentIO;
5555
use crate::FuseTable;
5656
use crate::OPT_KEY_LEGACY_SNAPSHOT_LOC;
5757
use crate::OPT_KEY_SNAPSHOT_LOCATION;
@@ -425,7 +425,7 @@ impl FuseTable {
425425
}
426426

427427
// Read all segments information in parallel.
428-
let fuse_segment_io = FuseSegmentIO::create(ctx.clone(), self.operator.clone());
428+
let fuse_segment_io = SegmentsIO::create(ctx.clone(), self.operator.clone());
429429
let results = fuse_segment_io.read_segments(&new_segments).await?;
430430
for result in results.iter() {
431431
let segment = result.clone()?;

src/query/storages/fuse/src/operations/delete.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ impl FuseTable {
153153
)
154154
.await
155155
{
156-
abort_operation.abort(self.operator.clone()).await;
156+
abort_operation.abort(ctx, self.operator.clone()).await?;
157157
return Err(e);
158158
}
159159
Ok(())

src/query/storages/fuse/src/operations/mutation/abort_operation.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
17+
use common_catalog::table_context::TableContext;
18+
use common_exception::Result;
1519
use common_fuse_meta::meta::BlockMeta;
1620
use opendal::Operator;
1721

22+
use crate::io::Files;
23+
1824
#[derive(Default, Clone, Debug)]
1925
pub struct AbortOperation {
2026
pub segments: Vec<String>,
@@ -37,15 +43,9 @@ impl AbortOperation {
3743
self
3844
}
3945

40-
pub async fn abort(self, operator: Operator) {
41-
for block in self.blocks {
42-
let _ = operator.object(&block).delete().await;
43-
}
44-
for index in self.bloom_filter_indexes {
45-
let _ = operator.object(&index).delete().await;
46-
}
47-
for segment in self.segments {
48-
let _ = operator.object(&segment).delete().await;
49-
}
46+
pub async fn abort(self, ctx: Arc<dyn TableContext>, operator: Operator) -> Result<()> {
47+
let fuse_file = Files::create(ctx, operator);
48+
let locations = vec![self.blocks, self.bloom_filter_indexes, self.segments].concat();
49+
fuse_file.remove_file_in_batch(&locations).await
5050
}
5151
}

src/query/storages/fuse/src/operations/mutation/compact_mutator.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,9 @@ impl TableMutator for CompactMutator {
197197
.commit_mutation(ctx.clone(), self.base_snapshot.clone(), segments, summary)
198198
.await
199199
{
200-
abort_operation.abort(self.data_accessor.clone()).await;
200+
abort_operation
201+
.abort(ctx, self.data_accessor.clone())
202+
.await?;
201203
return Err(e);
202204
}
203205
Ok(())

src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ impl TableMutator for ReclusterMutator {
242242
.await
243243
{
244244
abort_operation
245-
.abort(self.base_mutator.data_accessor.clone())
246-
.await;
245+
.abort(ctx, self.base_mutator.data_accessor.clone())
246+
.await?;
247247
return Err(e);
248248
}
249249
Ok(())

0 commit comments

Comments
 (0)