Skip to content

feat: Meta-service cluster is gated by Enterprise Edition(Do NOT merge) #16231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/reuse.linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ jobs:
needs: [ build, check ]
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup_license
with:
runner_provider: ${{ inputs.runner_provider }}
- uses: ./.github/actions/test_metactl
timeout-minutes: 10

Expand All @@ -128,6 +131,9 @@ jobs:
needs: [ build, check ]
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup_license
with:
runner_provider: ${{ inputs.runner_provider }}
- uses: ./.github/actions/test_compat_meta_query
timeout-minutes: 10

Expand All @@ -136,6 +142,9 @@ jobs:
needs: [ build, check ]
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup_license
with:
runner_provider: ${{ inputs.runner_provider }}
- uses: ./.github/actions/test_compat_fuse
timeout-minutes: 20

Expand All @@ -144,6 +153,9 @@ jobs:
needs: [ build, check ]
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup_license
with:
runner_provider: ${{ inputs.runner_provider }}
- uses: ./.github/actions/test_compat_meta_meta
timeout-minutes: 20

Expand All @@ -160,6 +172,9 @@ jobs:
needs: [ build, check ]
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup_license
with:
runner_provider: ${{ inputs.runner_provider }}
- uses: ./.github/actions/test_meta_cluster
timeout-minutes: 10

Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/meta/ee/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ doctest = false
test = true

[dependencies]
anyhow = { workspace = true }
databend-common-base = { workspace = true }
databend-common-license = { workspace = true }
jwt-simple = { workspace = true }
log = { workspace = true }

[build-dependencies]
databend-common-building = { workspace = true }
Expand Down
117 changes: 117 additions & 0 deletions src/meta/ee/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,120 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use databend_common_base::display::display_unix_epoch::DisplayUnixTimeStampExt;
use databend_common_license::license::LicenseInfo;
use jwt_simple::algorithms::ECDSAP256KeyPairLike;
use jwt_simple::algorithms::ECDSAP256PublicKeyLike;
use jwt_simple::algorithms::ES256KeyPair;
use jwt_simple::algorithms::ES256PublicKey;
use jwt_simple::claims::Claims;
use jwt_simple::claims::JWTClaims;
use jwt_simple::prelude::Clock;

#[derive(Clone, Default)]
pub struct MetaServiceEnterpriseGate {
/// License ES256 signed jwt claim token.
license_token: Arc<Mutex<Option<String>>>,

/// The public key to verify the license token.
public_key: String,
}

impl MetaServiceEnterpriseGate {
const LICENSE_PUBLIC_KEY: &'static str = r#"-----BEGIN PUBLIC KEY-----
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use a hard code public key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way only the the license token published by us with our private key will be correctly decoded.
If to let user config public key in config, user will be able to use a self signed token with its private key and let databend-meta decoded it with the public key it provided.

MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEGsKCbhXU7j56VKZ7piDlLXGhud0a
pWjW3wxSdeARerxs/BeoWK7FspDtfLaAT8iJe4YEmR0JpkRQ8foWs0ve3w==
-----END PUBLIC KEY-----"#;

pub fn new(token: Option<String>) -> Self {
Self {
license_token: Arc::new(Mutex::new(token)),
public_key: Self::LICENSE_PUBLIC_KEY.to_string(),
}
}

/// Create with a temp license for testing.
pub fn new_testing() -> Self {
let lic = LicenseInfo {
r#type: Some("trial".to_string()),
org: Some("databend".to_string()),
tenants: Some(vec!["test".to_string()]),
features: None,
};

let key_pair = ES256KeyPair::generate();
let claims = Claims::with_custom_claims(lic, jwt_simple::prelude::Duration::from_hours(2));
let token = key_pair.sign(claims).unwrap();

let public_key = key_pair.public_key().to_pem().unwrap();

Self {
license_token: Arc::new(Mutex::new(Some(token))),
public_key,
}
}

/// Parse the JWT token and restore the claims.
pub fn parse_jwt_token(&self, raw: &str) -> Result<JWTClaims<LicenseInfo>, anyhow::Error> {
let public_key = ES256PublicKey::from_pem(&self.public_key)?;

let claim = public_key.verify_token::<LicenseInfo>(raw, None)?;

Ok(claim)
}

fn check_license(&self, raw: &str) -> Result<(), anyhow::Error> {
let claim = self.parse_jwt_token(raw)?;

let now = Clock::now_since_epoch();

if Some(now) > claim.expires_at {
let expires_at = claim.expires_at.unwrap_or_default();
let unix_timestamp = Duration::from_millis(expires_at.as_millis());

return Err(anyhow::anyhow!(format!(
"License is expired at: {}",
unix_timestamp.display_unix_timestamp()
)));
}

Ok(())
}

pub fn assert_cluster_enabled(&self) -> Result<(), anyhow::Error> {
let token = {
let x = self.license_token.lock().unwrap();
x.clone()
};

let Some(token) = token.as_ref() else {
log::error!(
"No license set in config `databend_enterprise_license`, clustering is disabled",
);
return Err(anyhow::anyhow!(
"No license set in config `databend_enterprise_license`, clustering is disabled"
));
};

if let Err(e) = self.check_license(token) {
log::error!("Check license failed: {}", e);
return Err(e);
}

Ok(())
}

pub fn update_license(&self, license: String) -> Result<(), anyhow::Error> {
self.check_license(&license)?;

let mut x = self.license_token.lock().unwrap();
*x = Some(license);

Ok(())
}
}
8 changes: 8 additions & 0 deletions src/meta/raft-store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ pub struct RaftConfig {

/// Max timeout(in milli seconds) when waiting a cluster leader.
pub wait_leader_timeout: u64,

/// License token in string to enable enterprise features(including: `cluster`)
pub databend_enterprise_license: Option<String>,

/// For test only: whether to fake an enterprise license.
pub fake_ee_license: bool,
}

pub fn get_default_raft_advertise_host() -> String {
Expand Down Expand Up @@ -172,6 +178,8 @@ impl Default for RaftConfig {
sled_max_cache_size_mb: 10 * 1024,
cluster_name: "foo_cluster".to_string(),
wait_leader_timeout: 70000,
databend_enterprise_license: None,
fake_ee_license: false,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ databend-common-arrow = { workspace = true }
databend-common-base = { workspace = true }
databend-common-grpc = { workspace = true }
databend-common-http = { workspace = true }
databend-common-license = { workspace = true }
databend-common-meta-api = { workspace = true }
databend-common-meta-client = { workspace = true }
databend-common-meta-kvapi = { workspace = true }
Expand All @@ -40,6 +41,7 @@ databend-common-meta-stoerr = { workspace = true }
databend-common-meta-types = { workspace = true }
databend-common-metrics = { workspace = true }
databend-common-tracing = { workspace = true }
databend-enterprise-meta = { workspace = true }
deepsize = { workspace = true }
derive_more = { workspace = true }
fastrace = { workspace = true }
Expand Down
51 changes: 51 additions & 0 deletions src/meta/service/src/api/http/v1/ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use databend_common_license::display_jwt_claims::DisplayJWTClaimsExt;
use databend_common_meta_sled_store::openraft::async_runtime::watch::WatchReceiver;
use databend_common_meta_types::NodeId;
use http::StatusCode;
Expand Down Expand Up @@ -127,3 +129,52 @@ pub async fn trigger_transfer_leader(
voter_ids,
}))
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct UpdateLicenseQuery {
pub(crate) license: String,
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct UpdateLicenseResponse {
pub from: NodeId,
pub to: NodeId,
pub voter_ids: Vec<NodeId>,
}

/// Update the `databend_enterprise_license` of this meta node.
#[poem::handler]
pub async fn update_license(
meta_node: Data<&Arc<MetaNode>>,
query: Option<Query<UpdateLicenseQuery>>,
) -> poem::Result<impl IntoResponse> {
let Some(query) = query else {
return Err(poem::Error::from_string(
"Invalid license",
StatusCode::BAD_REQUEST,
));
};

let metrics = meta_node.raft.metrics().borrow_watched().clone();
let id = metrics.id;

let saved = meta_node
.ee_gate
.parse_jwt_token(query.license.as_str())
.map_err(|e| {
poem::Error::from_string(format!("Invalid license: {}", e), StatusCode::BAD_REQUEST)
})?;

meta_node
.ee_gate
.update_license(query.license.clone())
.map_err(|e| poem::Error::from_string(e.to_string(), StatusCode::BAD_REQUEST))?;

let claim_str = saved.display_jwt_claims().to_string();
info!("id={} Updated license: {}", id, claim_str);

let mut resp = BTreeMap::new();
resp.insert("Success", claim_str);

Ok(Json(resp))
}
4 changes: 4 additions & 0 deletions src/meta/service/src/api/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ impl HttpService {
"/v1/ctrl/trigger_transfer_leader",
get(super::http::v1::ctrl::trigger_transfer_leader),
)
.at(
"/v1/ctrl/update_license",
get(super::http::v1::ctrl::update_license),
)
.at(
"/v1/cluster/nodes",
get(super::http::v1::cluster_state::nodes_handler),
Expand Down
10 changes: 10 additions & 0 deletions src/meta/service/src/configs/outer_v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ pub struct ConfigViaEnv {
pub sled_tree_prefix: String,
pub sled_max_cache_size_mb: u64,
pub cluster_name: String,
pub databend_enterprise_license: Option<String>,
}

impl Default for ConfigViaEnv {
Expand Down Expand Up @@ -356,6 +357,7 @@ impl From<Config> for ConfigViaEnv {
sled_tree_prefix: cfg.raft_config.sled_tree_prefix,
sled_max_cache_size_mb: cfg.raft_config.sled_max_cache_size_mb,
cluster_name: cfg.raft_config.cluster_name,
databend_enterprise_license: cfg.raft_config.databend_enterprise_license,
}
}
}
Expand Down Expand Up @@ -393,6 +395,7 @@ impl Into<Config> for ConfigViaEnv {
sled_tree_prefix: self.sled_tree_prefix,
sled_max_cache_size_mb: self.sled_max_cache_size_mb,
cluster_name: self.cluster_name,
databend_enterprise_license: self.databend_enterprise_license,
};
let log_config = LogConfig {
file: FileLogConfig {
Expand Down Expand Up @@ -564,6 +567,10 @@ pub struct RaftConfig {
/// Max timeout(in milli seconds) when waiting a cluster leader.
#[clap(long, default_value = "180000")]
pub wait_leader_timeout: u64,

/// License token in string to enable enterprise features(including: `cluster`)
#[clap(long, value_name = "VALUE")]
pub databend_enterprise_license: Option<String>,
}

// TODO(rotbl): should not be used.
Expand Down Expand Up @@ -602,6 +609,8 @@ impl From<RaftConfig> for InnerRaftConfig {
sled_max_cache_size_mb: x.sled_max_cache_size_mb,
cluster_name: x.cluster_name,
wait_leader_timeout: x.wait_leader_timeout,
databend_enterprise_license: x.databend_enterprise_license,
fake_ee_license: false,
}
}
}
Expand Down Expand Up @@ -635,6 +644,7 @@ impl From<InnerRaftConfig> for RaftConfig {
sled_max_cache_size_mb: inner.sled_max_cache_size_mb,
cluster_name: inner.cluster_name,
wait_leader_timeout: inner.wait_leader_timeout,
databend_enterprise_license: inner.databend_enterprise_license,
}
}
}
Expand Down
Loading
Loading