Skip to content

gh-111495: Add tests for PyCodec_* C API #123343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ce7c135
add tests for C API `codecs`
picnixz Aug 26, 2024
f9e350a
add Python tests for `_codecs`
picnixz Aug 26, 2024
15b6811
fix size bug
picnixz Aug 26, 2024
8048ae1
rename test class
picnixz Aug 26, 2024
8487b46
Revert "fix size bug"
picnixz Sep 25, 2024
2dbe09a
Merge branch 'main' into test/c-api-codec-111495
picnixz Sep 25, 2024
0097f2a
Disable tests that are known to crash.
picnixz Sep 25, 2024
303b13c
address Victor's review
picnixz Sep 25, 2024
4f474dd
update tests to reflect user errors
picnixz Sep 25, 2024
d49743c
Merge remote-tracking branch 'upstream/main' into test/c-api-codec-11…
picnixz Sep 27, 2024
87ee0d2
fix C API codec tests
picnixz Sep 27, 2024
6a36eb0
small hack to make the test suite correct
picnixz Sep 27, 2024
145b285
remove un-necessary imports
picnixz Sep 28, 2024
dc9af16
Merge remote-tracking branch 'upstream/main' into test/c-api-codec-11…
picnixz Sep 29, 2024
7be1f55
use `_codecs._unregister_error` to cleanup test state
picnixz Sep 29, 2024
f72be5c
indicate some semantics for NULL case being tested
picnixz Sep 29, 2024
4d02c6c
revert a cosmetic change
picnixz Sep 29, 2024
0f26ca7
Move `PyCodec_NameReplaceErrors` test to the `_testlimitedcapi` module
picnixz Sep 29, 2024
1399779
add comment for why we do not test `_PyCodec_UnregisterError`
picnixz Sep 29, 2024
914151e
update a comment
picnixz Sep 29, 2024
8dd7e8d
revert one cosmetic change
picnixz Sep 29, 2024
1e6a5ce
Fix Windows compilation
picnixz Sep 29, 2024
2ba5f03
address Victor's review
picnixz Sep 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 286 additions & 2 deletions Lib/test/test_capi/test_codecs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import unittest
import codecs
import contextlib
import io
import re
import sys
import unittest
import unittest.mock as mock
import _testcapi
from test.support import import_helper

_testlimitedcapi = import_helper.import_module('_testlimitedcapi')

NULL = None
BAD_ARGUMENT = re.escape('bad argument type for built-in operation')


class CAPITest(unittest.TestCase):
class CAPIUnicodeTest(unittest.TestCase):
# TODO: Test the following functions:
#
# PyUnicode_BuildEncodingMap
Expand Down Expand Up @@ -516,5 +523,282 @@ def test_asrawunicodeescapestring(self):
# CRASHES asrawunicodeescapestring(NULL)


class CAPICodecs(unittest.TestCase):

def setUp(self):
self.enterContext(import_helper.isolated_modules())
self.enterContext(import_helper.CleanImport('codecs'))
self.codecs = import_helper.import_module('codecs')
# Encoding names are normalized internally by converting them
# to lowercase and their hyphens are replaced by underscores.
self.encoding_name = f'codec_reversed_{id(self)}'
# make sure that our custom codec is not already registered
self.assertRaises(LookupError, self.codecs.lookup, self.encoding_name)
# create the search function without registering yet
self._create_custom_codec()

def _create_custom_codec(self):
def codec_encoder(m, errors='strict'):
return (type(m)().join(reversed(m)), len(m))

def codec_decoder(c, errors='strict'):
return (type(c)().join(reversed(c)), len(c))

class IncrementalEncoder(codecs.IncrementalEncoder):
def encode(self, input, final=False):
return codec_encoder(input)

class IncrementalDecoder(codecs.IncrementalDecoder):
def decode(self, input, final=False):
return codec_decoder(input)

class StreamReader(codecs.StreamReader):
def encode(self, input, errors='strict'):
return codec_encoder(input, errors=errors)

def decode(self, input, errors='strict'):
return codec_decoder(input, errors=errors)

class StreamWriter(codecs.StreamWriter):
def encode(self, input, errors='strict'):
return codec_encoder(input, errors=errors)

def decode(self, input, errors='strict'):
return codec_decoder(input, errors=errors)

info = codecs.CodecInfo(
encode=codec_encoder,
decode=codec_decoder,
streamreader=StreamReader,
streamwriter=StreamWriter,
incrementalencoder=IncrementalEncoder,
incrementaldecoder=IncrementalDecoder,
name=self.encoding_name
)

def search_function(encoding):
if encoding == self.encoding_name:
return info
return None

self.codec_info = info
self.search_function = search_function

@contextlib.contextmanager
def use_custom_encoder(self):
self.assertRaises(LookupError, self.codecs.lookup, self.encoding_name)
self.codecs.register(self.search_function)
yield
self.codecs.unregister(self.search_function)
self.assertRaises(LookupError, self.codecs.lookup, self.encoding_name)

def test_codec_register(self):
search_function, encoding = self.search_function, self.encoding_name
# register the search function using the C API
self.assertIsNone(_testcapi.codec_register(search_function))
self.assertIs(self.codecs.lookup(encoding), search_function(encoding))
self.assertEqual(self.codecs.encode('123', encoding=encoding), '321')
# unregister the search function using the regular API
self.codecs.unregister(search_function)
self.assertRaises(LookupError, self.codecs.lookup, encoding)

def test_codec_unregister(self):
search_function, encoding = self.search_function, self.encoding_name
self.assertRaises(LookupError, self.codecs.lookup, encoding)
# register the search function using the regular API
self.codecs.register(search_function)
self.assertIsNotNone(self.codecs.lookup(encoding))
# unregister the search function using the C API
self.assertIsNone(_testcapi.codec_unregister(search_function))
self.assertRaises(LookupError, self.codecs.lookup, encoding)

def test_codec_known_encoding(self):
self.assertRaises(LookupError, self.codecs.lookup, 'unknown-codec')
self.assertFalse(_testcapi.codec_known_encoding('unknown-codec'))
self.assertFalse(_testcapi.codec_known_encoding('unknown_codec'))
self.assertFalse(_testcapi.codec_known_encoding('UNKNOWN-codec'))

encoding_name = self.encoding_name
self.assertRaises(LookupError, self.codecs.lookup, encoding_name)
self.codecs.register(self.search_function)

for name in [
encoding_name,
encoding_name.upper(),
encoding_name.replace('_', '-'),
]:
with self.subTest(name):
self.assertTrue(_testcapi.codec_known_encoding(name))

def test_codec_encode(self):
encode = _testcapi.codec_encode
self.assertEqual(encode('a', 'utf-8', NULL), b'a')
self.assertEqual(encode('a', 'utf-8', 'strict'), b'a')
self.assertEqual(encode('[é]', 'ascii', 'ignore'), b'[]')

self.assertRaises(TypeError, encode, NULL, 'ascii', 'strict')
with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
encode('a', NULL, 'strict')

def test_codec_decode(self):
decode = _testcapi.codec_decode

s = 'a\xa1\u4f60\U0001f600'
b = s.encode()

self.assertEqual(decode(b, 'utf-8', 'strict'), s)
self.assertEqual(decode(b, 'utf-8', NULL), s)
self.assertEqual(decode(b, 'latin1', 'strict'), b.decode('latin1'))
self.assertRaises(UnicodeDecodeError, decode, b, 'ascii', 'strict')
self.assertRaises(UnicodeDecodeError, decode, b, 'ascii', NULL)
self.assertEqual(decode(b, 'ascii', 'replace'), 'a' + '\ufffd'*9)

# _codecs.decode() only reports unknown errors policy when they are
# used; this is different from PyUnicode_Decode() which checks that
# both the encoding and the errors policy are recognized before even
# attempting to call the decoder.
self.assertEqual(decode(b'', 'utf-8', 'unknown-errors-policy'), '')
self.assertEqual(decode(b'a', 'utf-8', 'unknown-errors-policy'), 'a')

self.assertRaises(TypeError, decode, NULL, 'ascii', 'strict')
with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
decode(b, NULL, 'strict')

def test_codec_encoder(self):
codec_encoder = _testcapi.codec_encoder

with self.use_custom_encoder():
encoder = codec_encoder(self.encoding_name)
self.assertIs(encoder, self.codec_info.encode)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_encoder(NULL)

def test_codec_decoder(self):
codec_decoder = _testcapi.codec_decoder

with self.use_custom_encoder():
decoder = codec_decoder(self.encoding_name)
self.assertIs(decoder, self.codec_info.decode)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_decoder(NULL)

def test_codec_incremental_encoder(self):
codec_incremental_encoder = _testcapi.codec_incremental_encoder

with self.use_custom_encoder():
encoding = self.encoding_name

for policy in ['strict', NULL]:
with self.subTest(policy=policy):
encoder = codec_incremental_encoder(encoding, policy)
self.assertIsInstance(encoder, self.codec_info.incrementalencoder)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_incremental_encoder(NULL, 'strict')

def test_codec_incremental_decoder(self):
codec_incremental_decoder = _testcapi.codec_incremental_decoder

with self.use_custom_encoder():
encoding = self.encoding_name

for policy in ['strict', NULL]:
with self.subTest(policy=policy):
decoder = codec_incremental_decoder(encoding, policy)
self.assertIsInstance(decoder, self.codec_info.incrementaldecoder)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_incremental_decoder(NULL, 'strict')

def test_codec_stream_reader(self):
codec_stream_reader = _testcapi.codec_stream_reader

with self.use_custom_encoder():
encoding, stream = self.encoding_name, io.StringIO()
for policy in ['strict', NULL]:
with self.subTest(policy=policy):
writer = codec_stream_reader(encoding, stream, policy)
self.assertIsInstance(writer, self.codec_info.streamreader)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_stream_reader(NULL, stream, 'strict')

def test_codec_stream_writer(self):
codec_stream_writer = _testcapi.codec_stream_writer

with self.use_custom_encoder():
encoding, stream = self.encoding_name, io.StringIO()
for policy in ['strict', NULL]:
with self.subTest(policy=policy):
writer = codec_stream_writer(encoding, stream, policy)
self.assertIsInstance(writer, self.codec_info.streamwriter)

with self.assertRaisesRegex(TypeError, BAD_ARGUMENT):
codec_stream_writer(NULL, stream, 'strict')


class CAPICodecErrors(unittest.TestCase):

def setUp(self):
self.enterContext(import_helper.isolated_modules())
self.enterContext(import_helper.CleanImport('codecs'))
self.codecs = import_helper.import_module('codecs')

def test_codec_register_error(self):
self.assertRaises(LookupError, _testcapi.codec_lookup_error, 'custom')

def error_handler(exc):
raise exc

error_handler = mock.Mock(wraps=error_handler)
_testcapi.codec_register_error('custom', error_handler)

self.assertRaises(UnicodeEncodeError, self.codecs.encode,
'\xff', 'ascii', errors='custom')
error_handler.assert_called_once()
error_handler.reset_mock()

self.assertRaises(UnicodeDecodeError, self.codecs.decode,
b'\xff', 'ascii', errors='custom')
error_handler.assert_called_once()

def test_codec_lookup_error(self):
codec_lookup_error = _testcapi.codec_lookup_error
self.assertIs(codec_lookup_error(NULL), self.codecs.strict_errors)
self.assertIs(codec_lookup_error('strict'), self.codecs.strict_errors)
self.assertIs(codec_lookup_error('ignore'), self.codecs.ignore_errors)
self.assertIs(codec_lookup_error('replace'), self.codecs.replace_errors)
self.assertIs(codec_lookup_error('xmlcharrefreplace'), self.codecs.xmlcharrefreplace_errors)
self.assertIs(codec_lookup_error('namereplace'), self.codecs.namereplace_errors)
self.assertRaises(LookupError, codec_lookup_error, 'custom')

def test_codec_error_handlers(self):
exceptions = [
# A UnicodeError with an empty message currently crashes:
# See: https://github.com/python/cpython/issues/123378
# UnicodeEncodeError('bad', '', 0, 1, 'reason'),
UnicodeEncodeError('bad', 'x', 0, 1, 'reason'),
UnicodeEncodeError('bad', 'xyz123', 0, 1, 'reason'),
UnicodeEncodeError('bad', 'xyz123', 1, 4, 'reason'),
]

strict_handler = _testcapi.codec_strict_errors
for exc in exceptions:
with self.subTest(handler=strict_handler, exc=exc):
self.assertRaises(UnicodeEncodeError, strict_handler, exc)

for handler in [
_testcapi.codec_ignore_errors,
_testcapi.codec_replace_errors,
_testcapi.codec_xmlcharrefreplace_errors,
_testcapi.codec_namereplace_errors,
]:
for exc in exceptions:
with self.subTest(handler=handler, exc=exc):
handler(exc)


if __name__ == "__main__":
unittest.main()
Loading
Loading