Skip to content

Commit 0ca9a3c

Browse files
authored
feat: output of Copy. (#12594)
* feat: output of Copy. * update tests.
1 parent cad282e commit 0ca9a3c

File tree

71 files changed

+699
-110
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+699
-110
lines changed

Cargo.lock

+28-17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/storage/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ async-backtrace = { workspace = true }
2323
async-trait = "0.1"
2424
bytes = "1"
2525
chrono = { workspace = true }
26+
dashmap = { version = "5.5.1", features = ["serde"] }
2627
flagset = "0.4"
2728
futures = "0.3"
2829
log = { workspace = true }

src/common/storage/src/copy.rs

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_exception::ErrorCode;
16+
use dashmap::mapref::entry::Entry;
17+
use dashmap::DashMap;
18+
use serde::Deserialize;
19+
use serde::Serialize;
20+
21+
#[derive(Default, Clone, Serialize, Deserialize)]
22+
pub struct CopyStatus {
23+
/// Key is file path.
24+
pub files: DashMap<String, FileStatus>,
25+
}
26+
27+
impl CopyStatus {
28+
pub fn add_chunk(&self, file_path: &str, file_status: FileStatus) {
29+
match self.files.entry(file_path.to_string()) {
30+
Entry::Occupied(mut e) => {
31+
e.get_mut().merge(file_status);
32+
}
33+
Entry::Vacant(e) => {
34+
e.insert(file_status);
35+
}
36+
};
37+
}
38+
39+
pub fn merge(&self, other: CopyStatus) {
40+
for (k, v) in other.files.into_iter() {
41+
self.add_chunk(&k, v);
42+
}
43+
}
44+
}
45+
46+
#[derive(Default, Clone, Serialize, Deserialize)]
47+
pub struct FileStatus {
48+
pub num_rows_loaded: usize,
49+
pub error: Option<FileErrorsInfo>,
50+
}
51+
52+
impl FileStatus {
53+
pub fn add_error(&mut self, e: ErrorCode, line: usize) {
54+
match &mut self.error {
55+
None => {
56+
self.error = Some(FileErrorsInfo {
57+
num_errors: 1,
58+
first_error: FileErrorInfo {
59+
code: e.code(),
60+
message: e.message(),
61+
line,
62+
},
63+
});
64+
}
65+
Some(info) => {
66+
info.num_errors += 1;
67+
if info.first_error.line > line {
68+
info.first_error = FileErrorInfo {
69+
code: e.code(),
70+
message: e.message(),
71+
line,
72+
};
73+
}
74+
}
75+
};
76+
}
77+
78+
fn merge(&mut self, other: FileStatus) {
79+
self.num_rows_loaded += other.num_rows_loaded;
80+
match (&mut self.error, other.error) {
81+
(None, Some(e)) => self.error = Some(e),
82+
(Some(e1), Some(e2)) => e1.merge(e2),
83+
_ => {}
84+
}
85+
}
86+
}
87+
88+
#[derive(Default, Clone, Serialize, Deserialize)]
89+
pub struct FileErrorsInfo {
90+
pub num_errors: usize,
91+
pub first_error: FileErrorInfo,
92+
}
93+
94+
impl FileErrorsInfo {
95+
fn merge(&mut self, other: FileErrorsInfo) {
96+
self.num_errors += other.num_errors;
97+
if self.first_error.line > other.first_error.line {
98+
self.first_error = other.first_error;
99+
}
100+
}
101+
}
102+
103+
#[derive(Default, Clone, Serialize, Deserialize)]
104+
pub struct FileErrorInfo {
105+
pub code: u16,
106+
pub message: String,
107+
pub line: usize,
108+
}

src/common/storage/src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ pub use stage::StageFileInfo;
6363
pub use stage::StageFileStatus;
6464
pub use stage::StageFilesInfo;
6565

66+
mod copy;
6667
mod statistics;
68+
69+
pub use copy::CopyStatus;
70+
pub use copy::FileStatus;
6771
pub use statistics::Datum;
6872
pub use statistics::F64;

src/query/catalog/src/table_context.rs

+6
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ use common_meta_app::principal::UserInfo;
3535
use common_pipeline_core::InputError;
3636
use common_settings::ChangeValue;
3737
use common_settings::Settings;
38+
use common_storage::CopyStatus;
3839
use common_storage::DataOperator;
40+
use common_storage::FileStatus;
3941
use common_storage::StageFileInfo;
4042
use common_storage::StorageMetrics;
4143
use dashmap::DashMap;
@@ -197,4 +199,8 @@ pub trait TableContext: Send + Sync {
197199
fn add_segment_location(&self, segment_loc: Location) -> Result<()>;
198200

199201
fn get_segment_locations(&self) -> Result<Vec<Location>>;
202+
203+
fn add_file_status(&self, file_path: &str, file_status: FileStatus) -> Result<()>;
204+
205+
fn get_copy_status(&self) -> Arc<CopyStatus>;
200206
}

0 commit comments

Comments
 (0)