diff --git a/docs/developer/cli-arguments.md b/docs/developer/cli-arguments.md index ab092216c3..acab29223e 100644 --- a/docs/developer/cli-arguments.md +++ b/docs/developer/cli-arguments.md @@ -64,6 +64,7 @@ Flags: --namespaces string Comma-separated list of namespaces to be enabled. Defaults to "" --namespaces-denylist string Comma-separated list of namespaces not to be enabled. If namespaces and namespaces-denylist are both set, only namespaces that are excluded in namespaces-denylist will be used. --node string Name of the node that contains the kube-state-metrics pod. Most likely it should be passed via the downward API. This is used for daemonset sharding. Only available for resources (pod metrics) that support spec.nodeName fieldSelector. This is experimental. + --object-limit int The total number of objects to list per resource from the API Server. (experimental) --one_output If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true) --pod string Name of the pod that contains the kube-state-metrics container. When set, it is expected that --pod and --pod-namespace are both set. Most likely this should be passed via the downward API. This is used for auto-detecting sharding. If set, this has preference over statically configured sharding. This is experimental, it may be removed without notice. --pod-namespace string Name of the namespace of the pod specified by --pod. When set, it is expected that --pod and --pod-namespace are both set. Most likely this should be passed via the downward API. This is used for auto-detecting sharding. If set, this has preference over statically configured sharding. This is experimental, it may be removed without notice. diff --git a/internal/store/builder.go b/internal/store/builder.go index b66e455dc2..3abb11af7d 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -83,6 +83,7 @@ type Builder struct { totalShards int shard int32 useAPIServerCache bool + objectLimit int64 } // NewBuilder returns a new builder. @@ -165,6 +166,12 @@ func (b *Builder) WithUsingAPIServerCache(u bool) { b.useAPIServerCache = u } +// WithObjectLimit sets a limit on how many objects you can list from the APIServer. +// This is to protect kube-state-metrics from running out of memory if the APIServer has a lot of objects. +func (b *Builder) WithObjectLimit(l int64) { + b.objectLimit = l +} + // WithFamilyGeneratorFilter configures the family generator filter which decides which // metrics are to be exposed by the store build by the Builder. func (b *Builder) WithFamilyGeneratorFilter(l generator.FamilyGeneratorFilter) { @@ -215,6 +222,7 @@ func (b *Builder) WithCustomResourceStoreFactories(fs ...customresource.Registry f.ExpectedType(), f.ListWatch, b.useAPIServerCache, + b.objectLimit, ) } } @@ -363,150 +371,150 @@ func availableResources() []string { } func (b *Builder) buildConfigMapStores() []cache.Store { - return b.buildStoresFunc(configMapMetricFamilies(b.allowAnnotationsList["configmaps"], b.allowLabelsList["configmaps"]), &v1.ConfigMap{}, createConfigMapListWatch, b.useAPIServerCache) + return b.buildStoresFunc(configMapMetricFamilies(b.allowAnnotationsList["configmaps"], b.allowLabelsList["configmaps"]), &v1.ConfigMap{}, createConfigMapListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildCronJobStores() []cache.Store { - return b.buildStoresFunc(cronJobMetricFamilies(b.allowAnnotationsList["cronjobs"], b.allowLabelsList["cronjobs"]), &batchv1.CronJob{}, createCronJobListWatch, b.useAPIServerCache) + return b.buildStoresFunc(cronJobMetricFamilies(b.allowAnnotationsList["cronjobs"], b.allowLabelsList["cronjobs"]), &batchv1.CronJob{}, createCronJobListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildDaemonSetStores() []cache.Store { - return b.buildStoresFunc(daemonSetMetricFamilies(b.allowAnnotationsList["daemonsets"], b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch, b.useAPIServerCache) + return b.buildStoresFunc(daemonSetMetricFamilies(b.allowAnnotationsList["daemonsets"], b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildDeploymentStores() []cache.Store { - return b.buildStoresFunc(deploymentMetricFamilies(b.allowAnnotationsList["deployments"], b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch, b.useAPIServerCache) + return b.buildStoresFunc(deploymentMetricFamilies(b.allowAnnotationsList["deployments"], b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildEndpointsStores() []cache.Store { - return b.buildStoresFunc(endpointMetricFamilies(b.allowAnnotationsList["endpoints"], b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch, b.useAPIServerCache) + return b.buildStoresFunc(endpointMetricFamilies(b.allowAnnotationsList["endpoints"], b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildEndpointSlicesStores() []cache.Store { - return b.buildStoresFunc(endpointSliceMetricFamilies(b.allowAnnotationsList["endpointslices"], b.allowLabelsList["endpointslices"]), &discoveryv1.EndpointSlice{}, createEndpointSliceListWatch, b.useAPIServerCache) + return b.buildStoresFunc(endpointSliceMetricFamilies(b.allowAnnotationsList["endpointslices"], b.allowLabelsList["endpointslices"]), &discoveryv1.EndpointSlice{}, createEndpointSliceListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildHPAStores() []cache.Store { - return b.buildStoresFunc(hpaMetricFamilies(b.allowAnnotationsList["horizontalpodautoscalers"], b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch, b.useAPIServerCache) + return b.buildStoresFunc(hpaMetricFamilies(b.allowAnnotationsList["horizontalpodautoscalers"], b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildIngressStores() []cache.Store { - return b.buildStoresFunc(ingressMetricFamilies(b.allowAnnotationsList["ingresses"], b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch, b.useAPIServerCache) + return b.buildStoresFunc(ingressMetricFamilies(b.allowAnnotationsList["ingresses"], b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildJobStores() []cache.Store { - return b.buildStoresFunc(jobMetricFamilies(b.allowAnnotationsList["jobs"], b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch, b.useAPIServerCache) + return b.buildStoresFunc(jobMetricFamilies(b.allowAnnotationsList["jobs"], b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildLimitRangeStores() []cache.Store { - return b.buildStoresFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch, b.useAPIServerCache) + return b.buildStoresFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildMutatingWebhookConfigurationStores() []cache.Store { - return b.buildStoresFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch, b.useAPIServerCache) + return b.buildStoresFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildNamespaceStores() []cache.Store { - return b.buildStoresFunc(namespaceMetricFamilies(b.allowAnnotationsList["namespaces"], b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch, b.useAPIServerCache) + return b.buildStoresFunc(namespaceMetricFamilies(b.allowAnnotationsList["namespaces"], b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildNetworkPolicyStores() []cache.Store { - return b.buildStoresFunc(networkPolicyMetricFamilies(b.allowAnnotationsList["networkpolicies"], b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch, b.useAPIServerCache) + return b.buildStoresFunc(networkPolicyMetricFamilies(b.allowAnnotationsList["networkpolicies"], b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildNodeStores() []cache.Store { - return b.buildStoresFunc(nodeMetricFamilies(b.allowAnnotationsList["nodes"], b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch, b.useAPIServerCache) + return b.buildStoresFunc(nodeMetricFamilies(b.allowAnnotationsList["nodes"], b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildPersistentVolumeClaimStores() []cache.Store { - return b.buildStoresFunc(persistentVolumeClaimMetricFamilies(b.allowAnnotationsList["persistentvolumeclaims"], b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch, b.useAPIServerCache) + return b.buildStoresFunc(persistentVolumeClaimMetricFamilies(b.allowAnnotationsList["persistentvolumeclaims"], b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildPersistentVolumeStores() []cache.Store { - return b.buildStoresFunc(persistentVolumeMetricFamilies(b.allowAnnotationsList["persistentvolumes"], b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch, b.useAPIServerCache) + return b.buildStoresFunc(persistentVolumeMetricFamilies(b.allowAnnotationsList["persistentvolumes"], b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildPodDisruptionBudgetStores() []cache.Store { - return b.buildStoresFunc(podDisruptionBudgetMetricFamilies(b.allowAnnotationsList["poddisruptionbudgets"], b.allowLabelsList["poddisruptionbudgets"]), &policyv1.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch, b.useAPIServerCache) + return b.buildStoresFunc(podDisruptionBudgetMetricFamilies(b.allowAnnotationsList["poddisruptionbudgets"], b.allowLabelsList["poddisruptionbudgets"]), &policyv1.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildReplicaSetStores() []cache.Store { - return b.buildStoresFunc(replicaSetMetricFamilies(b.allowAnnotationsList["replicasets"], b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch, b.useAPIServerCache) + return b.buildStoresFunc(replicaSetMetricFamilies(b.allowAnnotationsList["replicasets"], b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildReplicationControllerStores() []cache.Store { - return b.buildStoresFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch, b.useAPIServerCache) + return b.buildStoresFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildResourceQuotaStores() []cache.Store { - return b.buildStoresFunc(resourceQuotaMetricFamilies(b.allowAnnotationsList["resourcequotas"], b.allowLabelsList["resourcequotas"]), &v1.ResourceQuota{}, createResourceQuotaListWatch, b.useAPIServerCache) + return b.buildStoresFunc(resourceQuotaMetricFamilies(b.allowAnnotationsList["resourcequotas"], b.allowLabelsList["resourcequotas"]), &v1.ResourceQuota{}, createResourceQuotaListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildSecretStores() []cache.Store { - return b.buildStoresFunc(secretMetricFamilies(b.allowAnnotationsList["secrets"], b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch, b.useAPIServerCache) + return b.buildStoresFunc(secretMetricFamilies(b.allowAnnotationsList["secrets"], b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildServiceAccountStores() []cache.Store { - return b.buildStoresFunc(serviceAccountMetricFamilies(b.allowAnnotationsList["serviceaccounts"], b.allowLabelsList["serviceaccounts"]), &v1.ServiceAccount{}, createServiceAccountListWatch, b.useAPIServerCache) + return b.buildStoresFunc(serviceAccountMetricFamilies(b.allowAnnotationsList["serviceaccounts"], b.allowLabelsList["serviceaccounts"]), &v1.ServiceAccount{}, createServiceAccountListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildServiceStores() []cache.Store { - return b.buildStoresFunc(serviceMetricFamilies(b.allowAnnotationsList["services"], b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch, b.useAPIServerCache) + return b.buildStoresFunc(serviceMetricFamilies(b.allowAnnotationsList["services"], b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildStatefulSetStores() []cache.Store { - return b.buildStoresFunc(statefulSetMetricFamilies(b.allowAnnotationsList["statefulsets"], b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch, b.useAPIServerCache) + return b.buildStoresFunc(statefulSetMetricFamilies(b.allowAnnotationsList["statefulsets"], b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildStorageClassStores() []cache.Store { - return b.buildStoresFunc(storageClassMetricFamilies(b.allowAnnotationsList["storageclasses"], b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch, b.useAPIServerCache) + return b.buildStoresFunc(storageClassMetricFamilies(b.allowAnnotationsList["storageclasses"], b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildPodStores() []cache.Store { - return b.buildStoresFunc(podMetricFamilies(b.allowAnnotationsList["pods"], b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch, b.useAPIServerCache) + return b.buildStoresFunc(podMetricFamilies(b.allowAnnotationsList["pods"], b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildCsrStores() []cache.Store { - return b.buildStoresFunc(csrMetricFamilies(b.allowAnnotationsList["certificatesigningrequests"], b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch, b.useAPIServerCache) + return b.buildStoresFunc(csrMetricFamilies(b.allowAnnotationsList["certificatesigningrequests"], b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildValidatingWebhookConfigurationStores() []cache.Store { - return b.buildStoresFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch, b.useAPIServerCache) + return b.buildStoresFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildVolumeAttachmentStores() []cache.Store { - return b.buildStoresFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch, b.useAPIServerCache) + return b.buildStoresFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildLeasesStores() []cache.Store { - return b.buildStoresFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch, b.useAPIServerCache) + return b.buildStoresFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildClusterRoleStores() []cache.Store { - return b.buildStoresFunc(clusterRoleMetricFamilies(b.allowAnnotationsList["clusterroles"], b.allowLabelsList["clusterroles"]), &rbacv1.ClusterRole{}, createClusterRoleListWatch, b.useAPIServerCache) + return b.buildStoresFunc(clusterRoleMetricFamilies(b.allowAnnotationsList["clusterroles"], b.allowLabelsList["clusterroles"]), &rbacv1.ClusterRole{}, createClusterRoleListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildRoleStores() []cache.Store { - return b.buildStoresFunc(roleMetricFamilies(b.allowAnnotationsList["roles"], b.allowLabelsList["roles"]), &rbacv1.Role{}, createRoleListWatch, b.useAPIServerCache) + return b.buildStoresFunc(roleMetricFamilies(b.allowAnnotationsList["roles"], b.allowLabelsList["roles"]), &rbacv1.Role{}, createRoleListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildClusterRoleBindingStores() []cache.Store { - return b.buildStoresFunc(clusterRoleBindingMetricFamilies(b.allowAnnotationsList["clusterrolebindings"], b.allowLabelsList["clusterrolebindings"]), &rbacv1.ClusterRoleBinding{}, createClusterRoleBindingListWatch, b.useAPIServerCache) + return b.buildStoresFunc(clusterRoleBindingMetricFamilies(b.allowAnnotationsList["clusterrolebindings"], b.allowLabelsList["clusterrolebindings"]), &rbacv1.ClusterRoleBinding{}, createClusterRoleBindingListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildRoleBindingStores() []cache.Store { - return b.buildStoresFunc(roleBindingMetricFamilies(b.allowAnnotationsList["rolebindings"], b.allowLabelsList["rolebindings"]), &rbacv1.RoleBinding{}, createRoleBindingListWatch, b.useAPIServerCache) + return b.buildStoresFunc(roleBindingMetricFamilies(b.allowAnnotationsList["rolebindings"], b.allowLabelsList["rolebindings"]), &rbacv1.RoleBinding{}, createRoleBindingListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildIngressClassStores() []cache.Store { - return b.buildStoresFunc(ingressClassMetricFamilies(b.allowAnnotationsList["ingressclasses"], b.allowLabelsList["ingressclasses"]), &networkingv1.IngressClass{}, createIngressClassListWatch, b.useAPIServerCache) + return b.buildStoresFunc(ingressClassMetricFamilies(b.allowAnnotationsList["ingressclasses"], b.allowLabelsList["ingressclasses"]), &networkingv1.IngressClass{}, createIngressClassListWatch, b.useAPIServerCache, b.objectLimit) } func (b *Builder) buildStores( metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher, - useAPIServerCache bool, + useAPIServerCache bool, objectLimit int64, ) []cache.Store { metricFamilies = generator.FilterFamilyGenerators(b.familyGeneratorFilter, metricFamilies) composedMetricGenFuncs := generator.ComposeMetricGenFuncs(metricFamilies) @@ -521,7 +529,7 @@ func (b *Builder) buildStores( klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } listWatcher := listWatchFunc(b.kubeClient, v1.NamespaceAll, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache) + b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) return []cache.Store{store} } @@ -535,7 +543,7 @@ func (b *Builder) buildStores( klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } listWatcher := listWatchFunc(b.kubeClient, ns, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache) + b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) stores = append(stores, store) } @@ -547,7 +555,7 @@ func (b *Builder) buildCustomResourceStores(resourceName string, metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher, - useAPIServerCache bool, + useAPIServerCache bool, objectLimit int64, ) []cache.Store { metricFamilies = generator.FilterFamilyGenerators(b.familyGeneratorFilter, metricFamilies) composedMetricGenFuncs := generator.ComposeMetricGenFuncs(metricFamilies) @@ -579,7 +587,7 @@ func (b *Builder) buildCustomResourceStores(resourceName string, klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } listWatcher := listWatchFunc(customResourceClient, v1.NamespaceAll, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache) + b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) return []cache.Store{store} } @@ -591,7 +599,7 @@ func (b *Builder) buildCustomResourceStores(resourceName string, ) klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) listWatcher := listWatchFunc(customResourceClient, ns, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache) + b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) stores = append(stores, store) } @@ -605,8 +613,9 @@ func (b *Builder) startReflector( store cache.Store, listWatcher cache.ListerWatcher, useAPIServerCache bool, + objectLimit int64, ) { - instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache) + instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit) reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0}) go reflector.Run(b.ctx.Done()) } diff --git a/pkg/app/server.go b/pkg/app/server.go index 34450729db..36b25b0319 100644 --- a/pkg/app/server.go +++ b/pkg/app/server.go @@ -247,6 +247,7 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { )) storeBuilder.WithUsingAPIServerCache(opts.UseAPIServerCache) + storeBuilder.WithObjectLimit(opts.ObjectLimit) storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc()) proc.StartReaper() diff --git a/pkg/builder/builder_test.go b/pkg/builder/builder_test.go index 0ceac21abf..4ecd59a29e 100644 --- a/pkg/builder/builder_test.go +++ b/pkg/builder/builder_test.go @@ -67,6 +67,7 @@ func customStore(_ []generator.FamilyGenerator, _ interface{}, _ func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher, _ bool, + _ int64, ) []cache.Store { stores := make([]cache.Store, 0, 2) stores = append(stores, newFakeStore(fakeMetricLists[0])) diff --git a/pkg/builder/types/interfaces.go b/pkg/builder/types/interfaces.go index b7ba595da1..bc13ca3c3e 100644 --- a/pkg/builder/types/interfaces.go +++ b/pkg/builder/types/interfaces.go @@ -57,7 +57,7 @@ type BuilderInterface interface { type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher, - useAPIServerCache bool, + useAPIServerCache bool, limit int64, ) []cache.Store // BuildCustomResourceStoresFunc function signature that is used to return a list of custom resource cache.Store @@ -65,7 +65,7 @@ type BuildCustomResourceStoresFunc func(resourceName string, metricFamilies []generator.FamilyGenerator, expectedType interface{}, listWatchFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher, - useAPIServerCache bool, + useAPIServerCache bool, limit int64, ) []cache.Store // AllowDenyLister interface for AllowDeny lister that can allow or exclude metrics by there names diff --git a/pkg/options/options.go b/pkg/options/options.go index 95af22b55f..300ed00982 100644 --- a/pkg/options/options.go +++ b/pkg/options/options.go @@ -79,6 +79,7 @@ type Options struct { Help bool `yaml:"help"` TrackUnscheduledPods bool `yaml:"track_unscheduled_pods"` UseAPIServerCache bool `yaml:"use_api_server_cache"` + ObjectLimit int64 `yaml:"object_limit"` } // GetConfigFile is the getter for --config value. @@ -143,6 +144,7 @@ func (o *Options) AddFlags(cmd *cobra.Command) { o.cmd.Flags().BoolVar(&o.TrackUnscheduledPods, "track-unscheduled-pods", false, "This configuration is used in conjunction with node configuration. When this configuration is true, node configuration is empty and the metric of unscheduled pods is fetched from the Kubernetes API Server. This is experimental.") o.cmd.Flags().BoolVarP(&o.Help, "help", "h", false, "Print Help text") o.cmd.Flags().BoolVarP(&o.UseAPIServerCache, "use-apiserver-cache", "", false, "Sets resourceVersion=0 for ListWatch requests, using cached resources from the apiserver instead of an etcd quorum read.") + o.cmd.Flags().Int64Var(&o.ObjectLimit, "object-limit", 0, "The total number of objects to list per resource from the API Server. (experimental)") o.cmd.Flags().Int32Var(&o.Shard, "shard", int32(0), "The instances shard nominal (zero indexed) within the total number of shards. (default 0)") o.cmd.Flags().IntVar(&o.Port, "port", 8080, `Port to expose metrics on.`) o.cmd.Flags().IntVar(&o.TelemetryPort, "telemetry-port", 8081, `Port to expose kube-state-metrics self metrics on.`) @@ -202,5 +204,9 @@ func (o *Options) Validate() error { return fmt.Errorf("value for --auto-gomemlimit-ratio=%f must be greater than 0 and less than or equal to 1", o.AutoGoMemlimitRatio) } + if o.ObjectLimit < 0 { + return fmt.Errorf("value for --object-limit=%d must be equal or greater than 0", o.ObjectLimit) + } + return nil } diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index 48fe2bcc11..d65b8a3d74 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -19,6 +19,7 @@ package watch import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -27,8 +28,10 @@ import ( // ListWatchMetrics stores the pointers of kube_state_metrics_[list|watch]_total metrics. type ListWatchMetrics struct { - WatchTotal *prometheus.CounterVec - ListTotal *prometheus.CounterVec + WatchRequestsTotal *prometheus.CounterVec + ListRequestsTotal *prometheus.CounterVec + ListObjectsLimit *prometheus.GaugeVec + ListObjectsCurrent *prometheus.GaugeVec } // NewListWatchMetrics takes in a prometheus registry and initializes @@ -36,20 +39,34 @@ type ListWatchMetrics struct { // kube_state_metrics_watch_total metrics. It returns those registered metrics. func NewListWatchMetrics(r prometheus.Registerer) *ListWatchMetrics { return &ListWatchMetrics{ - WatchTotal: promauto.With(r).NewCounterVec( + WatchRequestsTotal: promauto.With(r).NewCounterVec( prometheus.CounterOpts{ Name: "kube_state_metrics_watch_total", - Help: "Number of total resource watches in kube-state-metrics", + Help: "Number of total resource watch calls in kube-state-metrics", }, []string{"result", "resource"}, ), - ListTotal: promauto.With(r).NewCounterVec( + ListRequestsTotal: promauto.With(r).NewCounterVec( prometheus.CounterOpts{ Name: "kube_state_metrics_list_total", - Help: "Number of total resource list in kube-state-metrics", + Help: "Number of total resource list calls in kube-state-metrics", }, []string{"result", "resource"}, ), + ListObjectsCurrent: promauto.With(r).NewGaugeVec( + prometheus.GaugeOpts{ + Name: "kube_state_metrics_list_objects", + Help: "Number of resources listed in kube-state-metrics", + }, + []string{"resource"}, + ), + ListObjectsLimit: promauto.With(r).NewGaugeVec( + prometheus.GaugeOpts{ + Name: "kube_state_metrics_list_objects_limit", + Help: "Number of resource list limit in kube-state-metrics", + }, + []string{"resource"}, + ), } } @@ -60,34 +77,59 @@ type InstrumentedListerWatcher struct { metrics *ListWatchMetrics resource string useAPIServerCache bool + limit int64 } // NewInstrumentedListerWatcher returns a new InstrumentedListerWatcher. -func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetrics, resource string, useAPIServerCache bool) cache.ListerWatcher { +func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetrics, resource string, useAPIServerCache bool, limit int64) cache.ListerWatcher { return &InstrumentedListerWatcher{ lw: lw, metrics: metrics, resource: resource, useAPIServerCache: useAPIServerCache, + limit: limit, } } // List is a wrapper func around the cache.ListerWatcher.List func. It increases the success/error // / counters based on the outcome of the List operation it instruments. +// It supports setting object limits, this means if it is set it will only list and process +// n objects of the same resource type. func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { if i.useAPIServerCache { options.ResourceVersion = "0" } + if i.limit > 0 { + options.Limit = i.limit + i.metrics.ListObjectsLimit.WithLabelValues(i.resource).Set(float64(i.limit)) + } + res, err := i.lw.List(options) + if err != nil { - i.metrics.ListTotal.WithLabelValues("error", i.resource).Inc() + i.metrics.ListRequestsTotal.WithLabelValues("error", i.resource).Inc() return nil, err } - i.metrics.ListTotal.WithLabelValues("success", i.resource).Inc() + list, err := meta.ExtractList(res) + if err != nil { + return nil, err + } + i.metrics.ListRequestsTotal.WithLabelValues("success", i.resource).Inc() + + if i.limit > 0 { + if int64(len(list)) > i.limit { + meta.SetList(res, list[0:i.limit]) + i.metrics.ListObjectsCurrent.WithLabelValues(i.resource).Set(float64(i.limit)) + } else { + i.metrics.ListObjectsCurrent.WithLabelValues(i.resource).Set(float64(len(list))) + } + } + return res, nil + } // Watch is a wrapper func around the cache.ListerWatcher.Watch func. It increases the success/error @@ -95,10 +137,10 @@ func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Ob func (i *InstrumentedListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { res, err := i.lw.Watch(options) if err != nil { - i.metrics.WatchTotal.WithLabelValues("error", i.resource).Inc() + i.metrics.WatchRequestsTotal.WithLabelValues("error", i.resource).Inc() return nil, err } - i.metrics.WatchTotal.WithLabelValues("success", i.resource).Inc() + i.metrics.WatchRequestsTotal.WithLabelValues("success", i.resource).Inc() return res, nil }