Skip to content

Commit 07a6cc8

Browse files
committed
Evict pods based on expiry annotations (#20)
## Description This is the flip side of stackabletech/secret-operator#114, and implements the actual eviction of the pods once they expire. - [X] Time-based eviction (`restarter.stackable.tech/expires-at.{TAG}` annotation) - [ ] Dependency-based eviction (like the current STS eviction)
1 parent 9770beb commit 07a6cc8

File tree

10 files changed

+179
-14
lines changed

10 files changed

+179
-14
lines changed

Cargo.lock

+3-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/modules/ROOT/nav.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
* Concepts
55
** xref:authenticationclass.adoc[]
66
** xref:tls.adoc[]
7-
** xref:reloader.adoc[]
7+
** xref:restarter.adoc[]

docs/modules/ROOT/pages/reloader.adoc

-3
This file was deleted.
+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
= Restarter
2+
3+
The Stackable Commons Operator can automatically restart `Pod` objects based on certain criteria. This can be applied to
4+
either the `Pod` or certain controller objects (such as `StatefulSet`).
5+
6+
== `Pod`
7+
8+
Pods are evicted when any of their restart criteria (listed below) expire, with the expectation that their owning controller
9+
is then responsible for restarting them.
10+
11+
Because they are evicted rather than deleted, this process should respect `PodDisruptionBudget` constraints, allowing users
12+
to ensure that clusters are restarted gracefully.
13+
14+
=== Expiration date
15+
16+
Annotation:: `restarter.stackable.tech/expires-at.\{tag\}`
17+
18+
Pods can be configured to expire at a certain point in time. In this case, the `Pod` should have the annotation
19+
`restarter.stackable.tech/expires-at.\{tag\}` set to a datetime formatted according to RFC 3339 (such as
20+
`"2022-04-21T13:24:15.225774724+00:00"`).
21+
`\{tag\}` should be a deterministic but unique ID identifying the reason for the expiry.
22+
23+
Multiple `expires-at` annotations can be set on the same `Pod`, in which case the *earliest* expiration datetime
24+
takes precedence.
25+
26+
== `StatefulSet`
27+
28+
StatefulSets are rolling-restarted when any of their restart criteria (listed below) expire.
29+
30+
=== Stale configuration
31+
32+
Label:: `restarter.stackable.tech/enabled`
33+
34+
StatefulSets can be configured to restart when any referenced configuration object (`ConfigMap` or `Secret`) changes.
35+
To enable this, set the `restarter.stackable.tech/enabled` label on the `StatefulSet` to `true`.

docs/modules/ROOT/pages/usage.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ Multiple operators use this CRD as a way to express the authentication of the pr
1414
|xref:tls.adoc[]
1515
|Section of a CRD describing how to connect to a TLS enabled system
1616

17-
|xref:reloader.adoc[]
18-
|An operator that watches `StatefulSets` and restarts them if mounted `ConfigMap` or `Secret` changes
17+
|xref:restarter.adoc[]
18+
|An operator that watches `Pod` objects and their controllers and restarts them when required
1919
|===
2020

2121
The following diagram describes the relationship between the CRDs

rust/operator-binary/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ snafu = "0.7"
1818
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.16.0" }
1919
strum = { version = "0.24", features = ["derive"] }
2020
tokio = { version = "1.17", features = ["macros", "rt-multi-thread"] }
21+
tracing = "0.1.34"
2122

2223
[build-dependencies]
2324
built = { version = "0.5", features = ["chrono", "git2"] }

rust/operator-binary/src/main.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod restart_controller;
22

3+
use futures::pin_mut;
34
use stackable_operator::cli::{Command, ProductOperatorRun};
45

56
use clap::Parser;
@@ -46,7 +47,10 @@ async fn main() -> anyhow::Result<()> {
4647
))
4748
.await?;
4849

49-
restart_controller::start(&client).await?
50+
let sts_restart_controller = restart_controller::statefulset::start(&client);
51+
let pod_restart_controller = restart_controller::pod::start(&client);
52+
pin_mut!(sts_restart_controller, pod_restart_controller);
53+
futures::future::select(sts_restart_controller, pod_restart_controller).await;
5054
}
5155
}
5256

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod pod;
2+
pub mod statefulset;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use futures::StreamExt;
4+
use snafu::{OptionExt, ResultExt, Snafu};
5+
use stackable_operator::{
6+
client::Client,
7+
k8s_openapi::{
8+
api::core::v1::Pod,
9+
chrono::{self, DateTime, FixedOffset, Utc},
10+
},
11+
kube::{
12+
self,
13+
api::{EvictParams, ListParams},
14+
core::DynamicObject,
15+
runtime::{
16+
controller::{Action, Context},
17+
reflector::ObjectRef,
18+
Controller,
19+
},
20+
},
21+
logging::controller::{report_controller_reconciled, ReconcilerError},
22+
};
23+
use strum::{EnumDiscriminants, IntoStaticStr};
24+
25+
struct Ctx {
26+
client: Client,
27+
}
28+
29+
#[derive(Snafu, Debug, EnumDiscriminants)]
30+
#[strum_discriminants(derive(IntoStaticStr))]
31+
enum Error {
32+
#[snafu(display("Pod has no name"))]
33+
PodHasNoName,
34+
#[snafu(display(
35+
"failed to parse expiry timestamp annotation ({annotation:?}: {value:?}) as RFC 3999"
36+
))]
37+
UnparseableExpiryTimestamp {
38+
source: chrono::ParseError,
39+
annotation: String,
40+
value: String,
41+
},
42+
#[snafu(display("failed to evict Pod"))]
43+
EvictPod { source: kube::Error },
44+
}
45+
46+
impl ReconcilerError for Error {
47+
fn category(&self) -> &'static str {
48+
ErrorDiscriminants::from(self).into()
49+
}
50+
51+
fn secondary_object(&self) -> Option<ObjectRef<DynamicObject>> {
52+
match self {
53+
Error::PodHasNoName => None,
54+
Error::UnparseableExpiryTimestamp {
55+
source: _,
56+
annotation: _,
57+
value: _,
58+
} => None,
59+
Error::EvictPod { source: _ } => None,
60+
}
61+
}
62+
}
63+
64+
pub async fn start(client: &Client) {
65+
let controller = Controller::new(client.get_all_api::<Pod>(), ListParams::default());
66+
controller
67+
.run(
68+
reconcile,
69+
error_policy,
70+
Context::new(Ctx {
71+
client: client.clone(),
72+
}),
73+
)
74+
.for_each(|res| async move {
75+
report_controller_reconciled(client, "pod.restarter.commons.stackable.tech", &res)
76+
})
77+
.await;
78+
}
79+
80+
async fn reconcile(pod: Arc<Pod>, ctx: Context<Ctx>) -> Result<Action, Error> {
81+
if pod.metadata.deletion_timestamp.is_some() {
82+
// Object is already being deleted, no point trying again
83+
return Ok(Action::await_change());
84+
}
85+
86+
let pod_expires_at = pod
87+
.metadata
88+
.annotations
89+
.iter()
90+
.flatten()
91+
.filter(|(k, _)| k.starts_with("restarter.stackable.tech/expires-at."))
92+
.map(|(k, v)| {
93+
DateTime::parse_from_rfc3339(v).context(UnparseableExpiryTimestampSnafu {
94+
annotation: k,
95+
value: v,
96+
})
97+
})
98+
.min_by_key(|res| {
99+
// Prefer propagating errors over successful cases
100+
(res.is_ok(), res.as_ref().ok().cloned())
101+
})
102+
.transpose()?;
103+
104+
let now = DateTime::<FixedOffset>::from(Utc::now());
105+
let time_until_pod_expires = pod_expires_at.map(|expires_at| (expires_at - now).to_std());
106+
match time_until_pod_expires {
107+
Some(Err(_has_already_expired)) => {
108+
let pods = ctx
109+
.get_ref()
110+
.client
111+
.get_api::<Pod>(pod.metadata.namespace.as_deref());
112+
pods.evict(
113+
pod.metadata.name.as_deref().context(PodHasNoNameSnafu)?,
114+
&EvictParams::default(),
115+
)
116+
.await
117+
.context(EvictPodSnafu)?;
118+
Ok(Action::await_change())
119+
}
120+
Some(Ok(time_until_pod_expires)) => Ok(Action::requeue(time_until_pod_expires)),
121+
None => Ok(Action::await_change()),
122+
}
123+
}
124+
125+
fn error_policy(_error: &Error, _ctx: Context<Ctx>) -> Action {
126+
Action::requeue(Duration::from_secs(5))
127+
}

rust/operator-binary/src/restart_controller.rs renamed to rust/operator-binary/src/restart_controller/statefulset.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ impl ReconcilerError for Error {
6060
}
6161
}
6262

63-
pub async fn start(client: &Client) -> anyhow::Result<()> {
64-
let kube = kube::Client::try_default().await?;
63+
pub async fn start(client: &Client) {
64+
let kube = client.as_kube_client();
6565
let stses = kube::Api::<StatefulSet>::all(kube.clone());
6666
let cms = kube::Api::<ConfigMap>::all(kube.clone());
6767
let secrets = kube::Api::<Secret>::all(kube.clone());
@@ -114,11 +114,9 @@ pub async fn start(client: &Client) -> anyhow::Result<()> {
114114
),
115115
)
116116
.for_each(|res| async move {
117-
report_controller_reconciled(client, "commons.superset.stackable.tech", &res)
117+
report_controller_reconciled(client, "statefulset.restarter.commons.stackable.tech", &res)
118118
})
119119
.await;
120-
121-
Ok(())
122120
}
123121

124122
fn trigger_all<S, K>(

0 commit comments

Comments
 (0)