Skip to content

Commit 1193741

Browse files
authored
Merge pull request kubernetes-client#2100 from davidopic/master
Fix UTF-8 failures in Watch
2 parents 72c830d + 934d026 commit 1193741

File tree

2 files changed

+144
-11
lines changed

2 files changed

+144
-11
lines changed

kubernetes/base/watch/watch.py

+24-11
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,33 @@ def _find_return_type(func):
5252

5353

5454
def iter_resp_lines(resp):
55-
prev = ""
56-
for seg in resp.stream(amt=None, decode_content=False):
57-
if isinstance(seg, bytes):
58-
seg = seg.decode('utf8')
59-
seg = prev + seg
60-
lines = seg.split("\n")
61-
if not seg.endswith("\n"):
62-
prev = lines[-1]
63-
lines = lines[:-1]
55+
buffer = bytearray()
56+
for segment in resp.stream(amt=None, decode_content=False):
57+
58+
# Append the segment (chunk) to the buffer
59+
#
60+
# Performance note: depending on contents of buffer and the type+value of segment,
61+
# encoding segment into the buffer could be a wasteful step. The approach used here
62+
# simplifies the logic farther down, but in the future it may be reasonable to
63+
# sacrifice readability for performance.
64+
if isinstance(segment, bytes):
65+
buffer.extend(segment)
66+
elif isinstance(segment, str):
67+
buffer.extend(segment.encode("utf-8"))
6468
else:
65-
prev = ""
66-
for line in lines:
69+
raise TypeError(
70+
f"Received invalid segment type, {type(segment)}, from stream. Accepts only 'str' or 'bytes'.")
71+
72+
# Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte)
73+
next_newline = buffer.find(b'\n')
74+
while next_newline != -1:
75+
# Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '�' character
76+
line = buffer[:next_newline].decode(
77+
"utf-8", errors="replace")
78+
buffer = buffer[next_newline+1:]
6779
if line:
6880
yield line
81+
next_newline = buffer.find(b'\n')
6982

7083

7184
class Watch(object):

kubernetes/base/watch/watch_test.py

+120
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,133 @@ def test_watch_with_decode(self):
6161
if count == 4:
6262
w.stop()
6363

64+
# make sure that all three records were consumed by the stream
65+
self.assertEqual(4, count)
66+
6467
fake_api.get_namespaces.assert_called_once_with(
6568
_preload_content=False, watch=True)
6669
fake_resp.stream.assert_called_once_with(
6770
amt=None, decode_content=False)
6871
fake_resp.close.assert_called_once()
6972
fake_resp.release_conn.assert_called_once()
7073

74+
def test_watch_with_interspersed_newlines(self):
75+
fake_resp = Mock()
76+
fake_resp.close = Mock()
77+
fake_resp.release_conn = Mock()
78+
fake_resp.stream = Mock(
79+
return_value=[
80+
'\n',
81+
'{"type": "ADDED", "object": {"metadata":',
82+
'{"name": "test1","resourceVersion": "1"}}}\n{"type": "ADDED", ',
83+
'"object": {"metadata": {"name": "test2", "resourceVersion": "2"}}}\n',
84+
'\n',
85+
'',
86+
'{"type": "ADDED", "object": {"metadata": {"name": "test3", "resourceVersion": "3"}}}\n',
87+
'\n\n\n',
88+
'\n',
89+
])
90+
91+
fake_api = Mock()
92+
fake_api.get_namespaces = Mock(return_value=fake_resp)
93+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
94+
95+
w = Watch()
96+
count = 0
97+
98+
# Consume all test events from the mock service, stopping when no more data is available.
99+
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
100+
# the only way to do so. Without that, the stream will re-read the test data forever.
101+
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
102+
count += 1
103+
self.assertEqual("test%d" % count, e['object'].metadata.name)
104+
self.assertEqual(3, count)
105+
106+
def test_watch_with_multibyte_utf8(self):
107+
fake_resp = Mock()
108+
fake_resp.close = Mock()
109+
fake_resp.release_conn = Mock()
110+
fake_resp.stream = Mock(
111+
return_value=[
112+
# two-byte utf-8 character
113+
'{"type":"MODIFIED","object":{"data":{"utf-8":"© 1"},"metadata":{"name":"test1","resourceVersion":"1"}}}\n',
114+
# same copyright character expressed as bytes
115+
b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xC2\xA9 2"},"metadata":{"name":"test2","resourceVersion":"2"}}}\n'
116+
# same copyright character with bytes split across two stream chunks
117+
b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xC2',
118+
b'\xA9 3"},"metadata":{"n',
119+
# more chunks of the same event, sent as a mix of bytes and strings
120+
'ame":"test3","resourceVersion":"3"',
121+
'}}}',
122+
b'\n'
123+
])
124+
125+
fake_api = Mock()
126+
fake_api.get_configmaps = Mock(return_value=fake_resp)
127+
fake_api.get_configmaps.__doc__ = ':return: V1ConfigMapList'
128+
129+
w = Watch()
130+
count = 0
131+
132+
# Consume all test events from the mock service, stopping when no more data is available.
133+
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
134+
# the only way to do so. Without that, the stream will re-read the test data forever.
135+
for event in w.stream(fake_api.get_configmaps, timeout_seconds=1):
136+
count += 1
137+
self.assertEqual("MODIFIED", event['type'])
138+
self.assertEqual("test%d" % count, event['object'].metadata.name)
139+
self.assertEqual("© %d" % count, event['object'].data["utf-8"])
140+
self.assertEqual(
141+
"%d" % count, event['object'].metadata.resource_version)
142+
self.assertEqual("%d" % count, w.resource_version)
143+
self.assertEqual(3, count)
144+
145+
def test_watch_with_invalid_utf8(self):
146+
fake_resp = Mock()
147+
fake_resp.close = Mock()
148+
fake_resp.release_conn = Mock()
149+
fake_resp.stream = Mock(
150+
# test 1 uses 1 invalid utf-8 byte
151+
# test 2 uses a sequence of 2 invalid utf-8 bytes
152+
# test 3 uses a sequence of 3 invalid utf-8 bytes
153+
return_value=[
154+
# utf-8 sequence for 😄 is \xF0\x9F\x98\x84
155+
# all other sequences below are invalid
156+
# ref: https://www.w3.org/2001/06/utf-8-wrong/UTF-8-test.html
157+
b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0\x9F\x98\x84 1","invalid":"\x80 1"},"metadata":{"name":"test1"}}}\n',
158+
b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0\x9F\x98\x84 2","invalid":"\xC0\xAF 2"},"metadata":{"name":"test2"}}}\n',
159+
# mix bytes/strings and split byte sequences across chunks
160+
b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0\x9F\x98',
161+
b'\x84 ',
162+
b'',
163+
b'3","invalid":"\xE0\x80',
164+
b'\xAF ',
165+
'3"},"metadata":{"n',
166+
'ame":"test3"',
167+
'}}}',
168+
b'\n'
169+
])
170+
171+
fake_api = Mock()
172+
fake_api.get_configmaps = Mock(return_value=fake_resp)
173+
fake_api.get_configmaps.__doc__ = ':return: V1ConfigMapList'
174+
175+
w = Watch()
176+
count = 0
177+
178+
# Consume all test events from the mock service, stopping when no more data is available.
179+
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
180+
# the only way to do so. Without that, the stream will re-read the test data forever.
181+
for event in w.stream(fake_api.get_configmaps, timeout_seconds=1):
182+
count += 1
183+
self.assertEqual("MODIFIED", event['type'])
184+
self.assertEqual("test%d" % count, event['object'].metadata.name)
185+
self.assertEqual("😄 %d" % count, event['object'].data["utf-8"])
186+
# expect N replacement characters in test N
187+
self.assertEqual("� %d".replace('�', '�'*count) %
188+
count, event['object'].data["invalid"])
189+
self.assertEqual(3, count)
190+
71191
def test_watch_for_follow(self):
72192
fake_resp = Mock()
73193
fake_resp.close = Mock()

0 commit comments

Comments
 (0)