Skip to content

Commit b95f371

Browse files
committed
add prometheusK8s.enforcedBodySizeLimit to CMO ConfigMap, limiting the
bodysize when scraping metric. Empty value or 0 means bodysize limit. "automatic" for automatically deduced bodysize limit.
1 parent d5a1720 commit b95f371

File tree

9 files changed

+323
-2
lines changed

9 files changed

+323
-2
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
- [#1630](https://github.com/openshift/cluster-monitoring-operator/pull/1630) Expose retention size settings for UWM Prometheus
1616
- [#1640](https://github.com/openshift/cluster-monitoring-operator/pull/1640) Deploy standalone admission webhook for HA.
1717
- [#1651](https://github.com/openshift/cluster-monitoring-operator/pull/1651) Allow retention to be configurable for Thanos-Ruler in UWM
18+
- [#1467](https://github.com/openshift/cluster-monitoring-operator/pull/1467) Add bodysize limit for metric scraping
1819

1920
## 4.10
2021

pkg/client/client.go

+18
Original file line numberDiff line numberDiff line change
@@ -1524,6 +1524,24 @@ func (c *Client) DeleteRole(ctx context.Context, role *rbacv1.Role) error {
15241524
return err
15251525
}
15261526

1527+
func (c *Client) PodCapacity(ctx context.Context) (int, error) {
1528+
nodes, err := c.kclient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
1529+
if err != nil {
1530+
return 0, err
1531+
}
1532+
var podCapacityTotal int64
1533+
for _, node := range nodes.Items {
1534+
podsCount, succeeded := node.Status.Capacity.Pods().AsInt64()
1535+
if !succeeded {
1536+
klog.Warningf("Cannot get pod capacity from node: %s. Error: %v", node.Name, err)
1537+
continue
1538+
}
1539+
podCapacityTotal += podsCount
1540+
}
1541+
1542+
return int(podCapacityTotal), nil
1543+
}
1544+
15271545
// mergeMetadata merges labels and annotations from `existing` map into `required` one where `required` has precedence
15281546
// over `existing` keys and values. Additionally function performs filtering of labels and annotations from `exiting` map
15291547
// where keys starting from string defined in `metadataPrefix` are deleted. This prevents issues with preserving stale

pkg/client/client_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
appsv1 "k8s.io/api/apps/v1"
2626
v1 "k8s.io/api/core/v1"
2727
rbacv1 "k8s.io/api/rbac/v1"
28+
"k8s.io/apimachinery/pkg/api/resource"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930

3031
monv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
@@ -1836,3 +1837,50 @@ func TestCreateOrUpdateValidatingWebhookConfiguration(t *testing.T) {
18361837
})
18371838
}
18381839
}
1840+
1841+
func TestPodCapacity(t *testing.T) {
1842+
ctx := context.Background()
1843+
node1 := v1.Node{
1844+
ObjectMeta: metav1.ObjectMeta{
1845+
Name: "node1",
1846+
},
1847+
Status: v1.NodeStatus{
1848+
Capacity: v1.ResourceList{
1849+
v1.ResourcePods: resource.MustParse("100"),
1850+
},
1851+
},
1852+
}
1853+
node2 := v1.Node{
1854+
ObjectMeta: metav1.ObjectMeta{
1855+
Name: "node2",
1856+
},
1857+
Status: v1.NodeStatus{
1858+
Capacity: v1.ResourceList{
1859+
v1.ResourcePods: resource.MustParse("50"),
1860+
},
1861+
},
1862+
}
1863+
nodeList := v1.NodeList{
1864+
Items: []v1.Node{
1865+
node1,
1866+
node2,
1867+
},
1868+
}
1869+
t.Run("sum 2 nodes pod capacity", func(st *testing.T) {
1870+
1871+
c := Client{
1872+
kclient: fake.NewSimpleClientset(nodeList.DeepCopy()),
1873+
}
1874+
1875+
podCapacity, err := c.PodCapacity(ctx)
1876+
1877+
if err != nil {
1878+
t.Fatal(err)
1879+
}
1880+
1881+
if podCapacity != 150 {
1882+
t.Fatalf("expected pods capacity 150, got %d", podCapacity)
1883+
}
1884+
})
1885+
1886+
}

pkg/manifests/config.go

+56-1
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,33 @@ package manifests
1616

1717
import (
1818
"bytes"
19+
"context"
1920
"encoding/json"
2021
"fmt"
2122
"io"
23+
"math"
2224

2325
configv1 "github.com/openshift/api/config/v1"
2426
monv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
27+
poperator "github.com/prometheus-operator/prometheus-operator/pkg/operator"
2528
v1 "k8s.io/api/core/v1"
2629
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
2730
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
31+
"k8s.io/klog/v2"
2832
)
2933

3034
const (
3135
DefaultRetentionValue = "15d"
36+
37+
// Limit the body size from scrape queries
38+
// Assumptions: one node has in average 110 pods, each pod exposes 400 metrics, each metric is expressed by on average 250 bytes.
39+
// 1.5x the size for a safe margin,
40+
// minimal HA requires 3 nodes. it rounds to 47.2 MB (49,500,000 Bytes).
41+
minimalSizeLimit = 3 * 1.5 * 110 * 400 * 250
42+
43+
// A value of Prometheusk8s.enforceBodySizeLimit,
44+
// meaning the limit will be automatically calculated based on cluster capacity.
45+
automaticBodySizeLimit = "automatic"
3246
)
3347

3448
type Config struct {
@@ -186,6 +200,12 @@ type PrometheusK8sConfig struct {
186200
TelemetryMatches []string `json:"-"`
187201
AlertmanagerConfigs []AdditionalAlertmanagerConfig `json:"additionalAlertmanagerConfigs"`
188202
QueryLogFile string `json:"queryLogFile"`
203+
/* EnforcedBodySizeLimit accept 3 kind of values:
204+
* 1. empty value: no limit
205+
* 2. a value in Prometheus size format, e.g. "64MB"
206+
* 3. string "automatic", which means the limit will be automatically calculated based on cluster capacity.
207+
*/
208+
EnforcedBodySizeLimit string `json:"enforcedBodySizeLimit,omitempty"`
189209
}
190210

191211
type AdditionalAlertmanagerConfig struct {
@@ -329,7 +349,6 @@ func NewConfig(content io.Reader) (*Config, error) {
329349
res := &c
330350
res.applyDefaults()
331351
c.UserWorkloadConfiguration = NewDefaultUserWorkloadMonitoringConfig()
332-
333352
return res, nil
334353
}
335354

@@ -477,6 +496,42 @@ func (c *Config) NoProxy() string {
477496
return c.ClusterMonitoringConfiguration.HTTPConfig.NoProxy
478497
}
479498

499+
// PodCapacityReader returns the maximum number of pods that can be scheduled in a cluster.
500+
type PodCapacityReader interface {
501+
PodCapacity(context.Context) (int, error)
502+
}
503+
504+
func (c *Config) LoadEnforcedBodySizeLimit(pcr PodCapacityReader, ctx context.Context) error {
505+
if c.ClusterMonitoringConfiguration.PrometheusK8sConfig.EnforcedBodySizeLimit == "" {
506+
return nil
507+
}
508+
509+
if c.ClusterMonitoringConfiguration.PrometheusK8sConfig.EnforcedBodySizeLimit == automaticBodySizeLimit {
510+
podCapacity, err := pcr.PodCapacity(ctx)
511+
if err != nil {
512+
return fmt.Errorf("error fetching pod capacity: %v", err)
513+
}
514+
c.ClusterMonitoringConfiguration.PrometheusK8sConfig.EnforcedBodySizeLimit = calculateBodySizeLimit(podCapacity)
515+
return nil
516+
}
517+
518+
return poperator.ValidateSizeField(c.ClusterMonitoringConfiguration.PrometheusK8sConfig.EnforcedBodySizeLimit)
519+
520+
}
521+
522+
func calculateBodySizeLimit(podCapacity int) string {
523+
const samplesPerPod = 400 // 400 samples per pod
524+
const sizePerSample = 200 // 200 Bytes
525+
526+
bodySize := podCapacity * samplesPerPod * sizePerSample
527+
if bodySize < minimalSizeLimit {
528+
klog.Infof("Calculated scrape body size limit %v is too small, using default value %v instead", bodySize, minimalSizeLimit)
529+
bodySize = minimalSizeLimit
530+
}
531+
532+
return fmt.Sprintf("%dMB", int(math.Ceil(float64(bodySize)/(1024*1024))))
533+
}
534+
480535
func NewConfigFromString(content string) (*Config, error) {
481536
if content == "" {
482537
return NewDefaultConfig(), nil

pkg/manifests/config_test.go

+100-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package manifests
1616

1717
import (
1818
"bytes"
19+
"context"
20+
"errors"
1921
"io/ioutil"
2022
"os"
2123
"testing"
@@ -192,7 +194,7 @@ func TestHttpProxyConfig(t *testing.T) {
192194
conf := `http:
193195
httpProxy: http://test.com
194196
httpsProxy: https://test.com
195-
noProxy: https://example.com
197+
noProxy: https://example.com
196198
`
197199

198200
c, err := NewConfig(bytes.NewBufferString(conf))
@@ -234,3 +236,100 @@ func TestHttpProxyConfig(t *testing.T) {
234236
}
235237
}
236238
}
239+
240+
type fakePodCapacity struct {
241+
capacity int
242+
err error
243+
}
244+
245+
func (fpc *fakePodCapacity) PodCapacity(context.Context) (int, error) {
246+
return fpc.capacity, fpc.err
247+
}
248+
249+
func TestLoadEnforcedBodySizeLimit(t *testing.T) {
250+
251+
mc_10 := fakePodCapacity{capacity: 10, err: nil}
252+
mc_1000 := fakePodCapacity{capacity: 1000, err: nil}
253+
mc_err := fakePodCapacity{capacity: 1000, err: errors.New("error")}
254+
for _, tt := range []struct {
255+
name string
256+
config string
257+
expectBodySizeLimit string
258+
expectError bool
259+
pcr PodCapacityReader
260+
}{
261+
{
262+
name: "empty config",
263+
config: "",
264+
expectBodySizeLimit: "",
265+
expectError: false,
266+
pcr: &mc_10,
267+
},
268+
{
269+
name: "disable body size limit",
270+
config: `{"prometheusK8s": {"enforcedBodySizeLimit": "0"}}`,
271+
expectBodySizeLimit: "0",
272+
expectError: false,
273+
pcr: &mc_10,
274+
},
275+
{
276+
name: "normal size format",
277+
config: `{"prometheusK8s": {"enforcedBodySizeLimit": "10KB"}}`,
278+
expectBodySizeLimit: "10KB",
279+
expectError: false,
280+
pcr: &mc_10,
281+
},
282+
{
283+
name: "invalid size format",
284+
config: `{"prometheusK8s": {"enforcedBodySizeLimit": "10EUR"}}`,
285+
expectBodySizeLimit: "",
286+
expectError: true,
287+
pcr: &mc_10,
288+
},
289+
{
290+
name: "automatic deduced limit: error when getting pods capacity",
291+
config: `{"prometheusK8s": {"enforcedBodySizeLimit": "automatic"}}`,
292+
expectBodySizeLimit: "",
293+
expectError: true,
294+
pcr: &mc_err,
295+
},
296+
{
297+
name: "automatically deduced limit: minimal 48MB",
298+
config: `{"prometheusK8s": {"enforcedBodySizeLimit": "automatic"}}`,
299+
expectBodySizeLimit: "48MB",
300+
expectError: false,
301+
pcr: &mc_10,
302+
},
303+
{
304+
name: "automatically deduced limit: larger than minimal 16MB",
305+
config: `{"prometheusK8s": {"enforcedBodySizeLimit": "automatic"}}`,
306+
expectBodySizeLimit: "77MB",
307+
expectError: false,
308+
pcr: &mc_1000,
309+
},
310+
} {
311+
t.Run(tt.name, func(t *testing.T) {
312+
c, err := NewConfigFromString(tt.config)
313+
if err != nil {
314+
t.Fatalf("config parsing error")
315+
}
316+
317+
err = c.LoadEnforcedBodySizeLimit(tt.pcr, context.Background())
318+
if tt.expectError {
319+
if err == nil {
320+
t.Fatalf("expected error, got nil")
321+
}
322+
return
323+
}
324+
if err != nil {
325+
t.Fatalf("expected no error, got error %v", err)
326+
}
327+
328+
if c.ClusterMonitoringConfiguration.PrometheusK8sConfig.EnforcedBodySizeLimit != tt.expectBodySizeLimit {
329+
t.Fatalf("incorrect EnforcedBodySizeLimit is set: got %s, expected %s",
330+
c.ClusterMonitoringConfiguration.PrometheusK8sConfig.EnforcedBodySizeLimit,
331+
tt.expectBodySizeLimit)
332+
}
333+
})
334+
}
335+
}

pkg/manifests/manifests.go

+4
Original file line numberDiff line numberDiff line change
@@ -1674,6 +1674,10 @@ func (f *Factory) PrometheusK8s(grpcTLS *v1.Secret, trustedCABundleCM *v1.Config
16741674
p.Spec.Secrets = append(p.Spec.Secrets, getAdditionalAlertmanagerSecrets(f.config.ClusterMonitoringConfiguration.PrometheusK8sConfig.AlertmanagerConfigs)...)
16751675
}
16761676

1677+
if f.config.ClusterMonitoringConfiguration.PrometheusK8sConfig.EnforcedBodySizeLimit != "" {
1678+
p.Spec.EnforcedBodySizeLimit = f.config.ClusterMonitoringConfiguration.PrometheusK8sConfig.EnforcedBodySizeLimit
1679+
}
1680+
16771681
return p, nil
16781682
}
16791683

pkg/manifests/manifests_test.go

+43
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package manifests
1616

1717
import (
18+
"context"
1819
"errors"
1920
"fmt"
2021
"net/url"
@@ -1533,8 +1534,13 @@ ingress:
15331534
t.Fatal("Prometheus image is not configured correctly")
15341535
}
15351536

1537+
if p.Spec.EnforcedBodySizeLimit != "" {
1538+
t.Fatal("EnforcedBodySizeLimit is not set to empty by default")
1539+
}
1540+
15361541
kubeRbacProxyTLSCipherSuitesArg := ""
15371542
kubeRbacProxyMinTLSVersionArg := ""
1543+
15381544
for _, container := range p.Spec.Containers {
15391545
switch container.Name {
15401546
case "prometheus-proxy":
@@ -1809,6 +1815,43 @@ func TestPrometheusRetentionConfigs(t *testing.T) {
18091815
}
18101816
}
18111817

1818+
func TestPrometheusK8sConfigurationBodySizeLimit(t *testing.T) {
1819+
pcr := &fakePodCapacity{
1820+
capacity: 1000,
1821+
err: nil,
1822+
}
1823+
ctx := context.Background()
1824+
1825+
c, err := NewConfigFromString(`
1826+
prometheusK8s:
1827+
enforcedBodySizeLimit: "10MB"
1828+
`)
1829+
1830+
if err != nil {
1831+
t.Fatal(err)
1832+
}
1833+
1834+
err = c.LoadEnforcedBodySizeLimit(pcr, ctx)
1835+
1836+
if err != nil {
1837+
t.Fatal(err)
1838+
}
1839+
1840+
f := NewFactory("openshift-monitoring", "openshift-user-workload-monitoring", c, defaultInfrastructureReader(), &fakeProxyReader{}, NewAssets(assetsPath), &APIServerConfig{}, nil)
1841+
p, err := f.PrometheusK8s(
1842+
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
1843+
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
1844+
)
1845+
if err != nil {
1846+
t.Fatal(err)
1847+
}
1848+
1849+
// the body size limit value is not set at configuration parsing time.
1850+
if p.Spec.EnforcedBodySizeLimit != "10MB" {
1851+
t.Fatalf("EnforcedBodySizeLimit is not configured correctly, expected 10MB but got %v", p.Spec.EnforcedBodySizeLimit)
1852+
}
1853+
1854+
}
18121855
func TestPrometheusK8sAdditionalAlertManagerConfigsSecret(t *testing.T) {
18131856
testCases := []struct {
18141857
name string

pkg/operator/operator.go

+6
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,12 @@ func (o *Operator) Config(ctx context.Context, key string) (*manifests.Config, e
829829
}
830830
o.userWorkloadEnabled = *c.ClusterMonitoringConfiguration.UserWorkloadEnabled
831831

832+
err = c.LoadEnforcedBodySizeLimit(o.client, ctx)
833+
if err != nil {
834+
c.ClusterMonitoringConfiguration.PrometheusK8sConfig.EnforcedBodySizeLimit = ""
835+
klog.Warningf("Error loading enforced body size limit, no body size limit will be enforced: %v", err)
836+
}
837+
832838
// Only fetch the token and cluster ID if they have not been specified in the config.
833839
if c.ClusterMonitoringConfiguration.TelemeterClientConfig.ClusterID == "" || c.ClusterMonitoringConfiguration.TelemeterClientConfig.Token == "" {
834840
err := c.LoadClusterID(func() (*configv1.ClusterVersion, error) {

0 commit comments

Comments
 (0)