Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Grow the resource_version monotonically when watching #131

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions watch/watch.py
Original file line number Diff line number Diff line change
@@ -95,13 +95,22 @@ def unmarshal_event(self, data, return_type):
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
resource_version = js['object'].metadata.resource_version
# For custom objects that we don't have model defined, json
# deserialization results in dictionary
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
and 'resourceVersion' in js['object']['metadata']):
self.resource_version = js['object']['metadata'][
'resourceVersion']
resource_version = js['object']['metadata']['resourceVersion']
else:
resource_version = None

# Resource version must never revert to old objects, especially
# on the first listing call (the objects are sorted randomly).
if (resource_version is not None and
(self.resource_version is None or
self.resource_version < resource_version)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think indentation is off here, no?

self.resource_version = resource_version

return js

def stream(self, func, *args, **kwargs):
39 changes: 38 additions & 1 deletion watch/watch_test.py
Original file line number Diff line number Diff line change
@@ -152,7 +152,7 @@ def get_values(*args, **kwargs):
# opaque value we cannot interpret it and order it so rely
# on k8s returning the events completely and in order
calls.append(call(_preload_content=False, watch=True,
resource_version="3"))
resource_version="5"))
Copy link
Author

@nolar nolar Apr 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the uncertainty:

The comment above this line says that we should not interpret the values and should rely on Kubernetes to return the events in order.

But Kubernetes does not return them in order.

Should we start interpreting the resource_version then?

PS: Full quote:

ideally we want 5 here but as rv must be treated as an
opaque value we cannot interpret it and order it so rely
on k8s returning the events completely and in order

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure Kubernetes do not return them in order? Have you anything to confirm that it is by design that order is not assured?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mitar There is an example in kubernetes-client/python#819


for c, e in enumerate(w.stream(fake_api.get_namespaces,
resource_version="5")):
@@ -164,6 +164,43 @@ def get_values(*args, **kwargs):
# more strict test with worse error message
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)

def test_watch_resource_version_grows_monotonically(self):
# https://github.com/kubernetes-client/python/issues/700
# ensure that the resource version never decrements,
# especially on the first listing of the resources
# (as they go in arbitrary order, not sorted).
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
return_value=[
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
'{"type": "ADDED", "object": {"metadata": {"name": "test2",'
'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
'"resourceVersion": "2"}, "spec": {}, "status": {}}}\n',
'should_not_happened\n'])

fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
watch_resource_versions = []
event_resource_versions = []
count = 1
for e in w.stream(fake_api.get_namespaces):
obj = e['object']
watch_resource_versions.append(w.resource_version)
event_resource_versions.append(obj.metadata.resource_version)
count += 1
if count == 4:
w.stop()

self.assertEqual(watch_resource_versions, ['1', '3', '3'])
self.assertEqual(event_resource_versions, ['1', '3', '2'])

def test_watch_stream_twice(self):
w = Watch(float)
for step in ['first', 'second']: