Skip to content

Commit 9ea2499

Browse files
committed
feat: Meta-service cluster is gated by Enterprise Edition
By default meta-service disallows clustering. Meta-service cluster is only enabled when raft-config `databend_enterprise_license` is set and is valid. No feature in the jwt claim is examined. The EE gate check when a meta node initiate raft-protocol network instance. Thus without a valid EE token, all raft-protocol are disabled, including `RequestVote`, `AppendEntries`, `InstallSnapshot` and internal request forward. If EE token is not set, an error will be outputed to log file. - New config `databend_enterprise_license`: ``` [raft_config] databend_enterprise_license = "<token>" ``` This token is same as the one used by databend-query. - When testing, a temp key pair and jwt claim is created to pass integration tests, this is enabled by `fake_ee_license` config entry. - Other changes: Add `DisplaySlice` and `DisplayUnixTimeStampExt` to display slice of `Display` instance and unix timestamp.
1 parent 8467049 commit 9ea2499

File tree

14 files changed

+309
-33
lines changed

14 files changed

+309
-33
lines changed

Cargo.lock

+6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt;
16+
17+
/// Implement `Display` for `&[T]` if T is `Display`.
18+
///
19+
/// It outputs at most `MAX` elements, excluding those from the 5th to the second-to-last one:
20+
/// - `DisplaySlice(&[1,2,3,4,5,6])` outputs: `"[1,2,3,4,...,6]"`.
21+
pub struct DisplaySlice<'a, T: fmt::Display, const MAX: usize = 5>(pub &'a [T]);
22+
23+
impl<'a, T: fmt::Display, const MAX: usize> fmt::Display for DisplaySlice<'a, T, MAX> {
24+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25+
let slice = self.0;
26+
let len = slice.len();
27+
28+
write!(f, "[")?;
29+
30+
if len > MAX {
31+
for (i, t) in slice[..(MAX - 1)].iter().enumerate() {
32+
if i > 0 {
33+
write!(f, ",")?;
34+
}
35+
36+
write!(f, "{}", t)?;
37+
}
38+
39+
write!(f, ",..,")?;
40+
write!(f, "{}", slice.last().unwrap())?;
41+
} else {
42+
for (i, t) in slice.iter().enumerate() {
43+
if i > 0 {
44+
write!(f, ",")?;
45+
}
46+
47+
write!(f, "{}", t)?;
48+
}
49+
}
50+
51+
write!(f, "]")
52+
}
53+
}
54+
55+
pub trait DisplaySliceExt<'a, T: fmt::Display> {
56+
fn display(&'a self) -> DisplaySlice<'a, T>;
57+
58+
/// Display at most `MAX` elements.
59+
fn display_n<const MAX: usize>(&'a self) -> DisplaySlice<'a, T, MAX>;
60+
}
61+
62+
impl<T> DisplaySliceExt<'_, T> for [T]
63+
where T: fmt::Display
64+
{
65+
fn display(&self) -> DisplaySlice<T> {
66+
DisplaySlice(self)
67+
}
68+
69+
fn display_n<const MAX: usize>(&'_ self) -> DisplaySlice<'_, T, MAX> {
70+
DisplaySlice(self)
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use super::DisplaySlice;
77+
78+
#[test]
79+
fn test_display_slice() {
80+
let a = vec![1, 2, 3, 4];
81+
assert_eq!("[1,2,3,4]", DisplaySlice::<_>(&a).to_string());
82+
83+
let a = vec![1, 2, 3, 4, 5];
84+
assert_eq!("[1,2,3,4,5]", DisplaySlice::<_>(&a).to_string());
85+
86+
let a = vec![1, 2, 3, 4, 5, 6];
87+
assert_eq!("[1,2,3,4,..,6]", DisplaySlice::<_>(&a).to_string());
88+
89+
let a = vec![1, 2, 3, 4, 5, 6, 7];
90+
assert_eq!("[1,2,3,4,..,7]", DisplaySlice::<_>(&a).to_string());
91+
92+
let a = vec![1, 2, 3, 4, 5, 6, 7];
93+
assert_eq!("[1,..,7]", DisplaySlice::<_, 2>(&a).to_string());
94+
}
95+
}

src/common/base/src/display/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
// limitations under the License.
1414

1515
pub mod display_option;
16+
pub mod display_slice;
1617
pub mod display_unix_epoch;

src/meta/ee/Cargo.toml

+5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ doctest = false
1212
test = true
1313

1414
[dependencies]
15+
anyhow = { workspace = true }
16+
databend-common-base = { workspace = true }
17+
databend-common-license = { workspace = true }
18+
jwt-simple = { workspace = true }
19+
log = { workspace = true }
1520

1621
[build-dependencies]
1722
databend-common-building = { workspace = true }

src/meta/ee/src/lib.rs

+108
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,111 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
use std::sync::Mutex;
17+
use std::time::Duration;
18+
19+
use databend_common_base::display::display_unix_epoch::DisplayUnixTimeStampExt;
20+
use databend_common_license::license::LicenseInfo;
21+
use jwt_simple::algorithms::ECDSAP256KeyPairLike;
22+
use jwt_simple::algorithms::ECDSAP256PublicKeyLike;
23+
use jwt_simple::algorithms::ES256KeyPair;
24+
use jwt_simple::algorithms::ES256PublicKey;
25+
use jwt_simple::claims::Claims;
26+
use jwt_simple::claims::JWTClaims;
27+
use jwt_simple::prelude::Clock;
28+
29+
#[derive(Clone, Default)]
30+
pub struct MetaServiceEnterpriseGate {
31+
/// License ES256 signed jwt claim token.
32+
license_token: Arc<Mutex<Option<String>>>,
33+
34+
/// The public key to verify the license token.
35+
public_key: String,
36+
}
37+
38+
impl MetaServiceEnterpriseGate {
39+
const LICENSE_PUBLIC_KEY: &'static str = r#"-----BEGIN PUBLIC KEY-----
40+
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEGsKCbhXU7j56VKZ7piDlLXGhud0a
41+
pWjW3wxSdeARerxs/BeoWK7FspDtfLaAT8iJe4YEmR0JpkRQ8foWs0ve3w==
42+
-----END PUBLIC KEY-----"#;
43+
44+
pub fn new(token: Option<String>) -> Self {
45+
Self {
46+
license_token: Arc::new(Mutex::new(token)),
47+
public_key: Self::LICENSE_PUBLIC_KEY.to_string(),
48+
}
49+
}
50+
51+
/// Create with a temp license for testing.
52+
pub fn new_testing() -> Self {
53+
let lic = LicenseInfo {
54+
r#type: Some("trial".to_string()),
55+
org: Some("databend".to_string()),
56+
tenants: Some(vec!["test".to_string()]),
57+
features: None,
58+
};
59+
60+
let key_pair = ES256KeyPair::generate();
61+
let claims = Claims::with_custom_claims(lic, jwt_simple::prelude::Duration::from_hours(2));
62+
let token = key_pair.sign(claims).unwrap();
63+
64+
let public_key = key_pair.public_key().to_pem().unwrap();
65+
66+
Self {
67+
license_token: Arc::new(Mutex::new(Some(token))),
68+
public_key,
69+
}
70+
}
71+
72+
/// Parse the JWT token and restore the claims.
73+
fn parse_jwt_token(&self, raw: &str) -> Result<JWTClaims<LicenseInfo>, anyhow::Error> {
74+
let public_key = ES256PublicKey::from_pem(&self.public_key)?;
75+
76+
let claim = public_key.verify_token::<LicenseInfo>(raw, None)?;
77+
78+
Ok(claim)
79+
}
80+
81+
fn check_license(&self, raw: &str) -> Result<(), anyhow::Error> {
82+
let claim = self.parse_jwt_token(raw)?;
83+
84+
let now = Clock::now_since_epoch();
85+
86+
if Some(now) > claim.expires_at {
87+
let expires_at = claim.expires_at.unwrap_or_default();
88+
let unix_timestamp = Duration::from_millis(expires_at.as_millis());
89+
90+
return Err(anyhow::anyhow!(format!(
91+
"License is expired at: {}",
92+
unix_timestamp.display_unix_timestamp()
93+
)));
94+
}
95+
96+
Ok(())
97+
}
98+
99+
pub fn assert_cluster_enabled(&self) -> Result<(), anyhow::Error> {
100+
let token = {
101+
let x = self.license_token.lock().unwrap();
102+
x.clone()
103+
};
104+
105+
let Some(token) = token.as_ref() else {
106+
log::error!(
107+
"No license set in config `databend_enterprise_license`, clustering is disabled",
108+
);
109+
return Err(anyhow::anyhow!(
110+
"No license set in config `databend_enterprise_license`, clustering is disabled"
111+
));
112+
};
113+
114+
if let Err(e) = self.check_license(token) {
115+
log::error!("Check license failed: {}", e);
116+
return Err(e);
117+
}
118+
119+
Ok(())
120+
}
121+
}

src/meta/raft-store/src/config.rs

+8
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ pub struct RaftConfig {
131131

132132
/// Max timeout(in milli seconds) when waiting a cluster leader.
133133
pub wait_leader_timeout: u64,
134+
135+
/// License token in string to enable enterprise features(including: `cluster`)
136+
pub databend_enterprise_license: Option<String>,
137+
138+
/// For test only: whether to fake an enterprise license.
139+
pub fake_ee_license: bool,
134140
}
135141

136142
pub fn get_default_raft_advertise_host() -> String {
@@ -172,6 +178,8 @@ impl Default for RaftConfig {
172178
sled_max_cache_size_mb: 10 * 1024,
173179
cluster_name: "foo_cluster".to_string(),
174180
wait_leader_timeout: 70000,
181+
databend_enterprise_license: None,
182+
fake_ee_license: false,
175183
}
176184
}
177185
}

src/meta/service/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ databend-common-meta-stoerr = { workspace = true }
4040
databend-common-meta-types = { workspace = true }
4141
databend-common-metrics = { workspace = true }
4242
databend-common-tracing = { workspace = true }
43+
databend-enterprise-meta = { workspace = true }
4344
deepsize = { workspace = true }
4445
derive_more = { workspace = true }
4546
fastrace = { workspace = true }

src/meta/service/src/configs/outer_v0.rs

+10
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ pub struct ConfigViaEnv {
303303
pub sled_tree_prefix: String,
304304
pub sled_max_cache_size_mb: u64,
305305
pub cluster_name: String,
306+
pub databend_enterprise_license: Option<String>,
306307
}
307308

308309
impl Default for ConfigViaEnv {
@@ -356,6 +357,7 @@ impl From<Config> for ConfigViaEnv {
356357
sled_tree_prefix: cfg.raft_config.sled_tree_prefix,
357358
sled_max_cache_size_mb: cfg.raft_config.sled_max_cache_size_mb,
358359
cluster_name: cfg.raft_config.cluster_name,
360+
databend_enterprise_license: cfg.raft_config.databend_enterprise_license,
359361
}
360362
}
361363
}
@@ -393,6 +395,7 @@ impl Into<Config> for ConfigViaEnv {
393395
sled_tree_prefix: self.sled_tree_prefix,
394396
sled_max_cache_size_mb: self.sled_max_cache_size_mb,
395397
cluster_name: self.cluster_name,
398+
databend_enterprise_license: self.databend_enterprise_license,
396399
};
397400
let log_config = LogConfig {
398401
file: FileLogConfig {
@@ -564,6 +567,10 @@ pub struct RaftConfig {
564567
/// Max timeout(in milli seconds) when waiting a cluster leader.
565568
#[clap(long, default_value = "180000")]
566569
pub wait_leader_timeout: u64,
570+
571+
/// License token in string to enable enterprise features(including: `cluster`)
572+
#[clap(long, value_name = "VALUE")]
573+
pub databend_enterprise_license: Option<String>,
567574
}
568575

569576
// TODO(rotbl): should not be used.
@@ -602,6 +609,8 @@ impl From<RaftConfig> for InnerRaftConfig {
602609
sled_max_cache_size_mb: x.sled_max_cache_size_mb,
603610
cluster_name: x.cluster_name,
604611
wait_leader_timeout: x.wait_leader_timeout,
612+
databend_enterprise_license: x.databend_enterprise_license,
613+
fake_ee_license: false,
605614
}
606615
}
607616
}
@@ -635,6 +644,7 @@ impl From<InnerRaftConfig> for RaftConfig {
635644
sled_max_cache_size_mb: inner.sled_max_cache_size_mb,
636645
cluster_name: inner.cluster_name,
637646
wait_leader_timeout: inner.wait_leader_timeout,
647+
databend_enterprise_license: inner.databend_enterprise_license,
638648
}
639649
}
640650
}

src/meta/service/src/meta_service/meta_node.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use databend_common_meta_types::Node;
5858
use databend_common_meta_types::NodeId;
5959
use databend_common_meta_types::RaftMetrics;
6060
use databend_common_meta_types::TypeConfig;
61+
use databend_enterprise_meta::MetaServiceEnterpriseGate;
6162
use fastrace::func_name;
6263
use fastrace::prelude::*;
6364
use futures::channel::oneshot;
@@ -122,6 +123,7 @@ impl Opened for MetaNode {
122123
pub struct MetaNodeBuilder {
123124
node_id: Option<NodeId>,
124125
raft_config: Option<Config>,
126+
ee_gate: MetaServiceEnterpriseGate,
125127
sto: Option<RaftStore>,
126128
raft_service_endpoint: Option<Endpoint>,
127129
}
@@ -142,7 +144,9 @@ impl MetaNodeBuilder {
142144
.take()
143145
.ok_or_else(|| MetaStartupError::InvalidConfig(String::from("sto is not set")))?;
144146

145-
let net = NetworkFactory::new(sto.clone());
147+
let ee_gate = self.ee_gate.clone();
148+
149+
let net = NetworkFactory::new(sto.clone(), ee_gate);
146150

147151
let log_store = sto.clone();
148152
let sm_store = sto.clone();
@@ -208,11 +212,19 @@ impl MetaNodeBuilder {
208212

209213
impl MetaNode {
210214
pub fn builder(config: &RaftConfig) -> MetaNodeBuilder {
215+
assert!(config.fake_ee_license);
216+
let ee_gate = if config.fake_ee_license {
217+
MetaServiceEnterpriseGate::new_testing()
218+
} else {
219+
MetaServiceEnterpriseGate::new(config.databend_enterprise_license.clone())
220+
};
221+
211222
let raft_config = MetaNode::new_raft_config(config);
212223

213224
MetaNodeBuilder {
214225
node_id: None,
215226
raft_config: Some(raft_config),
227+
ee_gate,
216228
sto: None,
217229
raft_service_endpoint: None,
218230
}

0 commit comments

Comments
 (0)