Skip to content

✨ Cache-Backed Client: Support listOpts.Limit #1479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
knownPod2 client.Object
knownPod3 client.Object
knownPod4 client.Object
knownPod5 client.Object
knownPod6 client.Object
)

BeforeEach(func() {
Expand All @@ -122,14 +124,20 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways)
knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"})
knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever, map[string]string{"common-label": "common"})
knownPod5 = createPod("test-pod-5", testNamespaceOne, kcorev1.RestartPolicyNever)
knownPod6 = createPod("test-pod-6", testNamespaceTwo, kcorev1.RestartPolicyAlways)

podGVK := schema.GroupVersionKind{
Kind: "Pod",
Version: "v1",
}

knownPod1.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod2.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod3.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod4.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod5.GetObjectKind().SetGroupVersionKind(podGVK)
knownPod6.GetObjectKind().SetGroupVersionKind(podGVK)

By("creating the informer cache")
informerCache, err = createCacheFunc(cfg, cache.Options{})
Expand All @@ -149,6 +157,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
deletePod(knownPod2)
deletePod(knownPod3)
deletePod(knownPod4)
deletePod(knownPod5)
deletePod(knownPod6)

informerCacheCancel()
})
Expand Down Expand Up @@ -226,7 +236,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying that the returned pods have GVK populated")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(SatisfyAny(HaveLen(3), HaveLen(4)))
Expect(out.Items).Should(SatisfyAny(HaveLen(5), HaveLen(6)))
for _, p := range out.Items {
Expect(p.GroupVersionKind()).To(Equal(kcorev1.SchemeGroupVersion.WithKind("Pod")))
}
Expand All @@ -240,9 +250,10 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying that the returned pods are in test-namespace-1")
Expect(listObj.Items).NotTo(BeEmpty())
Expect(listObj.Items).Should(HaveLen(1))
actual := listObj.Items[0]
Expect(actual.Namespace).To(Equal(testNamespaceOne))
Expect(listObj.Items).Should(HaveLen(2))
for _, item := range listObj.Items {
Expect(item.Namespace).To(Equal(testNamespaceOne))
}
})

It("should deep copy the object unless told otherwise", func() {
Expand Down Expand Up @@ -295,7 +306,15 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Expect(errors.IsTimeout(err)).To(BeTrue())
})

It("should set the Limit option and limit number of objects to Limit when List is called", func() {
opts := &client.ListOptions{Limit: int64(3)}
By("verifying that only Limit (3) number of objects are retrieved from the cache")
listObj := &kcorev1.PodList{}
Expect(informerCache.List(context.Background(), listObj, opts)).To(Succeed())
Expect(listObj.Items).Should(HaveLen(3))
})
})

Context("with unstructured objects", func() {
It("should be able to list objects that haven't been watched previously", func() {
By("listing all services in the cluster")
Expand Down Expand Up @@ -396,9 +415,10 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying that the returned pods are in test-namespace-1")
Expect(listObj.Items).NotTo(BeEmpty())
Expect(listObj.Items).Should(HaveLen(1))
actual := listObj.Items[0]
Expect(actual.GetNamespace()).To(Equal(testNamespaceOne))
Expect(listObj.Items).Should(HaveLen(2))
for _, item := range listObj.Items {
Expect(item.GetNamespace()).To(Equal(testNamespaceOne))
}
})

It("should be able to restrict cache to a namespace", func() {
Expand All @@ -424,9 +444,10 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne))

Expect(out.Items).Should(HaveLen(2))
for _, item := range out.Items {
Expect(item.GetNamespace()).To(Equal(testNamespaceOne))
}
By("listing all nodes - should still be able to list a cluster-scoped resource")
nodeList := &unstructured.UnstructuredList{}
nodeList.SetGroupVersionKind(schema.GroupVersionKind{
Expand Down Expand Up @@ -606,9 +627,10 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying that the returned pods are in test-namespace-1")
Expect(listObj.Items).NotTo(BeEmpty())
Expect(listObj.Items).Should(HaveLen(1))
actual := listObj.Items[0]
Expect(actual.GetNamespace()).To(Equal(testNamespaceOne))
Expect(listObj.Items).Should(HaveLen(2))
for _, item := range listObj.Items {
Expect(item.Namespace).To(Equal(testNamespaceOne))
}
})

It("should be able to restrict cache to a namespace", func() {
Expand All @@ -634,9 +656,10 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca

By("verifying the returned pod is from the watched namespace")
Expect(out.Items).NotTo(BeEmpty())
Expect(out.Items).Should(HaveLen(1))
Expect(out.Items[0].GetNamespace()).To(Equal(testNamespaceOne))

Expect(out.Items).Should(HaveLen(2))
for _, item := range out.Items {
Expect(item.Namespace).To(Equal(testNamespaceOne))
}
By("listing all nodes - should still be able to list a cluster-scoped resource")
nodeList := &kmetav1.PartialObjectMetadataList{}
nodeList.SetGroupVersionKind(schema.GroupVersionKind{
Expand Down Expand Up @@ -795,25 +818,25 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Entry("when selectors are empty it has to inform about all the pods", selectorsTestCase{
fieldSelectors: map[string]string{},
labelSelectors: map[string]string{},
expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4"},
expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4", "test-pod-5", "test-pod-6"},
}),
Entry("when field matches one pod it has to inform about it", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.name": "test-pod-2"},
expectedPods: []string{"test-pod-2"},
}),
Entry("when field matches multiple pods it has to infor about all of them", selectorsTestCase{
Entry("when field matches multiple pods it has to inform about all of them", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
expectedPods: []string{"test-pod-2", "test-pod-3"},
expectedPods: []string{"test-pod-2", "test-pod-3", "test-pod-6"},
}),
Entry("when label matches one pod it has to inform about it", selectorsTestCase{
labelSelectors: map[string]string{"test-label": "test-pod-4"},
expectedPods: []string{"test-pod-4"},
}),
Entry("when label matches multiple pods it has to infor about all of them", selectorsTestCase{
Entry("when label matches multiple pods it has to inform about all of them", selectorsTestCase{
labelSelectors: map[string]string{"common-label": "common"},
expectedPods: []string{"test-pod-3", "test-pod-4"},
}),
Entry("when label and field matches one pod it has to infor about about it", selectorsTestCase{
Entry("when label and field matches one pod it has to inform about about it", selectorsTestCase{
labelSelectors: map[string]string{"common-label": "common"},
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
expectedPods: []string{"test-pod-3"},
Expand Down
9 changes: 8 additions & 1 deletion pkg/cache/internal/cache_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,15 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
labelSel = listOpts.LabelSelector
}

limitSet := listOpts.Limit > 0

runtimeObjs := make([]runtime.Object, 0, len(objs))
for _, item := range objs {
for i, item := range objs {
// if the Limit option is set and the number of items
// listed exceeds this limit, then stop reading.
if limitSet && int64(i) >= listOpts.Limit {
break
}
obj, isObj := item.(runtime.Object)
if !isObj {
return fmt.Errorf("cache contained %T, which is not an Object", obj)
Expand Down
16 changes: 15 additions & 1 deletion pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,13 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList,
if err != nil {
return err
}

limitSet := listOpts.Limit > 0

var resourceVersion string
for _, cache := range c.namespaceToCache {
listObj := list.DeepCopyObject().(client.ObjectList)
err = cache.List(ctx, listObj, opts...)
err = cache.List(ctx, listObj, &listOpts)
if err != nil {
return err
}
Expand All @@ -173,6 +176,17 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList,
allItems = append(allItems, items...)
// The last list call should have the most correct resource version.
resourceVersion = accessor.GetResourceVersion()
if limitSet {
// decrement Limit by the number of items
// fetched from the current namespace.
listOpts.Limit -= int64(len(items))
// if a Limit was set and the number of
// items read has reached this set limit,
// then stop reading.
if listOpts.Limit == 0 {
break
}
}
}
listAccessor.SetResourceVersion(resourceVersion)

Expand Down