Skip to content

Commit 95a4189

Browse files
labrenbeNickLarsenNZstackable-bot
authored
feat: Inject vector aggregator address as env into vector config (#645)
* watch referenced configmaps and inject vector aggregator discovery cm as env * wip: use local stackable-operator version * add changelog entry * bump operator-rs * remove local patch for operator-rs * chore: Update nix files * chore: Generated commit to update templated files since the last template run up to stackabletech/operator-templating@8ad0568 (#646) Reference-to: stackabletech/operator-templating@8ad0568 (Update nixpkgs) * fix: Use `json` file extension for log files (#647) * fix: Use `json` file extension for log files * chore: Update changelog * fix formatting in changelog * fix typo in changelog * chore: Rename store variable * chore: Use borrows * chore: Formatting * chore: Formatting * noop --------- Co-authored-by: Nick Larsen <[email protected]> Co-authored-by: Stacky McStackface <[email protected]> Co-authored-by: Nick <[email protected]>
1 parent dac79c9 commit 95a4189

File tree

8 files changed

+122
-125
lines changed

8 files changed

+122
-125
lines changed

CHANGELOG.md

+8
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,20 @@
88
- BREAKING: The file log directory was set by `HBASE_OPERATOR_LOG_DIRECTORY`, and is now set by `ROLLING_LOGS`
99
(or via `--rolling-logs <DIRECTORY>`).
1010
- Replace stackable-operator `print_startup_string` with `tracing::info!` with fields.
11+
- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead
12+
of having the operator write it to the vector config ([#645]).
13+
14+
### Fixed
15+
16+
- Use `json` file extension for log files ([#647]).
17+
- Fix a bug where changes to ConfigMaps that are referenced in the HbaseCluster spec didn't trigger a reconciliation ([#645]).
1118

1219
### Fixed
1320

1421
- Use `json` file extension for log files ([#647]).
1522

1623
[#640]: https://github.com/stackabletech/hbase-operator/pull/640
24+
[#645]: https://github.com/stackabletech/hbase-operator/pull/645
1725
[#647]: https://github.com/stackabletech/hbase-operator/pull/647
1826

1927
## [25.3.0] - 2025-03-21

Cargo.lock

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.nix

+7-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repository = "https://github.com/stackabletech/hbase-operator"
1111

1212
[workspace.dependencies]
1313
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
14-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.89.1" }
14+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.90.0" }
1515
stackable-telemetry = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-telemetry-0.4.0" }
1616
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.7.1" }
1717

crate-hashes.json

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/operator-binary/src/hbase_controller.rs

+24-27
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ use crate::{
8282
operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs},
8383
product_logging::{
8484
CONTAINERDEBUG_LOG_DIRECTORY, STACKABLE_LOG_DIR, extend_role_group_config_map,
85-
resolve_vector_aggregator_address,
8685
},
8786
security::{self, opa::HbaseOpaConfig},
8887
zookeeper::{self, ZookeeperConnectionInformation},
@@ -235,10 +234,8 @@ pub enum Error {
235234
#[snafu(display("failed to resolve and merge config for role and role group"))]
236235
FailedToResolveConfig { source: crate::crd::Error },
237236

238-
#[snafu(display("failed to resolve the Vector aggregator address"))]
239-
ResolveVectorAggregatorAddress {
240-
source: crate::product_logging::Error,
241-
},
237+
#[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))]
238+
VectorAggregatorConfigMapMissing,
242239

243240
#[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))]
244241
InvalidLoggingConfig {
@@ -350,10 +347,6 @@ pub async fn reconcile_hbase(
350347
.await
351348
.context(RetrieveZookeeperConnectionInformationSnafu)?;
352349

353-
let vector_aggregator_address = resolve_vector_aggregator_address(hbase, client)
354-
.await
355-
.context(ResolveVectorAggregatorAddressSnafu)?;
356-
357350
let roles = hbase.build_role_properties().context(RolePropertiesSnafu)?;
358351

359352
let validated_config = validate_all_roles_and_groups_config(
@@ -448,7 +441,6 @@ pub async fn reconcile_hbase(
448441
&merged_config,
449442
&resolved_product_image,
450443
hbase_opa_config.as_ref(),
451-
vector_aggregator_address.as_deref(),
452444
)?;
453445
let rg_statefulset = build_rolegroup_statefulset(
454446
hbase,
@@ -576,7 +568,6 @@ fn build_rolegroup_config_map(
576568
merged_config: &AnyServiceConfig,
577569
resolved_product_image: &ResolvedProductImage,
578570
hbase_opa_config: Option<&HbaseOpaConfig>,
579-
vector_aggregator_address: Option<&str>,
580571
) -> Result<ConfigMap, Error> {
581572
let mut hbase_site_xml = String::new();
582573
let mut hbase_env_sh = String::new();
@@ -703,7 +694,6 @@ fn build_rolegroup_config_map(
703694

704695
extend_role_group_config_map(
705696
rolegroup,
706-
vector_aggregator_address,
707697
merged_config.logging(),
708698
&mut builder,
709699
&resolved_product_image.product_version,
@@ -1003,21 +993,28 @@ fn build_rolegroup_statefulset(
1003993

1004994
// Vector sidecar shall be the last container in the list
1005995
if merged_config.logging().enable_vector_agent {
1006-
pod_builder.add_container(
1007-
product_logging::framework::vector_container(
1008-
resolved_product_image,
1009-
"hbase-config",
1010-
"log",
1011-
merged_config.logging().containers.get(&Container::Vector),
1012-
ResourceRequirementsBuilder::new()
1013-
.with_cpu_request("250m")
1014-
.with_cpu_limit("500m")
1015-
.with_memory_request("128Mi")
1016-
.with_memory_limit("128Mi")
1017-
.build(),
1018-
)
1019-
.context(ConfigureLoggingSnafu)?,
1020-
);
996+
if let Some(vector_aggregator_config_map_name) =
997+
&hbase.spec.cluster_config.vector_aggregator_config_map_name
998+
{
999+
pod_builder.add_container(
1000+
product_logging::framework::vector_container(
1001+
resolved_product_image,
1002+
"hbase-config",
1003+
"log",
1004+
merged_config.logging().containers.get(&Container::Vector),
1005+
ResourceRequirementsBuilder::new()
1006+
.with_cpu_request("250m")
1007+
.with_cpu_limit("500m")
1008+
.with_memory_request("128Mi")
1009+
.with_memory_limit("128Mi")
1010+
.build(),
1011+
vector_aggregator_config_map_name,
1012+
)
1013+
.context(ConfigureLoggingSnafu)?,
1014+
);
1015+
} else {
1016+
VectorAggregatorConfigMapMissingSnafu.fail()?;
1017+
}
10211018
}
10221019

10231020
let mut pod_template = pod_builder.build_template();

rust/operator-binary/src/main.rs

+73-37
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,17 @@ use hbase_controller::FULL_HBASE_CONTROLLER_NAME;
66
use stackable_operator::{
77
YamlSchema,
88
cli::{Command, ProductOperatorRun, RollingPeriod},
9-
k8s_openapi::api::{apps::v1::StatefulSet, core::v1::Service},
9+
k8s_openapi::api::{
10+
apps::v1::StatefulSet,
11+
core::v1::{ConfigMap, Service},
12+
},
1013
kube::{
14+
ResourceExt,
1115
core::DeserializeGuard,
1216
runtime::{
1317
Controller,
1418
events::{Recorder, Reporter},
19+
reflector::ObjectRef,
1520
watcher,
1621
},
1722
},
@@ -128,46 +133,77 @@ async fn main() -> anyhow::Result<()> {
128133
instance: None,
129134
}));
130135

131-
Controller::new(
136+
let hbase_controller = Controller::new(
132137
watch_namespace.get_api::<DeserializeGuard<v1alpha1::HbaseCluster>>(&client),
133138
watcher::Config::default(),
134-
)
135-
.owns(
136-
watch_namespace.get_api::<Service>(&client),
137-
watcher::Config::default(),
138-
)
139-
.owns(
140-
watch_namespace.get_api::<StatefulSet>(&client),
141-
watcher::Config::default(),
142-
)
143-
.shutdown_on_signal()
144-
.run(
145-
hbase_controller::reconcile_hbase,
146-
hbase_controller::error_policy,
147-
Arc::new(hbase_controller::Ctx {
148-
client: client.clone(),
149-
product_config,
150-
}),
151-
)
152-
.for_each_concurrent(
153-
16, // concurrency limit
154-
|result| {
155-
// The event_recorder needs to be shared across all invocations, so that
156-
// events are correctly aggregated
157-
let event_recorder = event_recorder.clone();
158-
async move {
159-
report_controller_reconciled(
160-
&event_recorder,
161-
FULL_HBASE_CONTROLLER_NAME,
162-
&result,
163-
)
164-
.await;
165-
}
166-
},
167-
)
168-
.await;
139+
);
140+
let config_map_store = hbase_controller.store();
141+
hbase_controller
142+
.owns(
143+
watch_namespace.get_api::<Service>(&client),
144+
watcher::Config::default(),
145+
)
146+
.owns(
147+
watch_namespace.get_api::<StatefulSet>(&client),
148+
watcher::Config::default(),
149+
)
150+
.shutdown_on_signal()
151+
.watches(
152+
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
153+
watcher::Config::default(),
154+
move |config_map| {
155+
config_map_store
156+
.state()
157+
.into_iter()
158+
.filter(move |hbase| references_config_map(hbase, &config_map))
159+
.map(|hbase| ObjectRef::from_obj(&*hbase))
160+
},
161+
)
162+
.run(
163+
hbase_controller::reconcile_hbase,
164+
hbase_controller::error_policy,
165+
Arc::new(hbase_controller::Ctx {
166+
client: client.clone(),
167+
product_config,
168+
}),
169+
)
170+
.for_each_concurrent(
171+
16, // concurrency limit
172+
|result| {
173+
// The event_recorder needs to be shared across all invocations, so that
174+
// events are correctly aggregated
175+
let event_recorder = event_recorder.clone();
176+
async move {
177+
report_controller_reconciled(
178+
&event_recorder,
179+
FULL_HBASE_CONTROLLER_NAME,
180+
&result,
181+
)
182+
.await;
183+
}
184+
},
185+
)
186+
.await;
169187
}
170188
}
171189

172190
Ok(())
173191
}
192+
193+
fn references_config_map(
194+
hbase: &DeserializeGuard<v1alpha1::HbaseCluster>,
195+
config_map: &DeserializeGuard<ConfigMap>,
196+
) -> bool {
197+
let Ok(hbase) = &hbase.0 else {
198+
return false;
199+
};
200+
201+
hbase.spec.cluster_config.zookeeper_config_map_name == config_map.name_any()
202+
|| hbase.spec.cluster_config.hdfs_config_map_name == config_map.name_any()
203+
|| match &hbase.spec.cluster_config.authorization {
204+
Some(hbase_authorization) => {
205+
hbase_authorization.opa.config_map_name == config_map.name_any()
206+
}
207+
None => false,
208+
}
209+
}

0 commit comments

Comments
 (0)