Skip to content

Clean up the sqlite3 tests #93056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 18 additions & 28 deletions Lib/test/test_sqlite3/test_dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@
from test.support.os_helper import TESTFN, TESTFN_UNDECODABLE, unlink, temp_dir, FakePath


# Helper for tests using TESTFN
@contextlib.contextmanager
def managed_connect(*args, in_mem=False, **kwargs):
cx = sqlite.connect(*args, **kwargs)
try:
yield cx
finally:
cx.close()
if not in_mem:
unlink(TESTFN)


# Helper for temporary memory databases
def memory_database(*args, **kwargs):
cx = sqlite.connect(":memory:", *args, **kwargs)
Expand Down Expand Up @@ -331,7 +319,7 @@ def test_error_code_on_exception(self):
@unittest.skipIf(sqlite.sqlite_version_info <= (3, 7, 16),
"Requires SQLite 3.7.16 or newer")
def test_extended_error_code_on_exception(self):
with managed_connect(":memory:", in_mem=True) as con:
with memory_database() as con:
with con:
con.execute("create table t(t integer check(t > 0))")
errmsg = "constraint failed"
Expand Down Expand Up @@ -389,7 +377,7 @@ def test_cursor(self):
def test_failed_open(self):
YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
with self.assertRaises(sqlite.OperationalError):
con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
sqlite.connect(YOU_CANNOT_OPEN_THIS)

def test_close(self):
self.cx.close()
Expand Down Expand Up @@ -653,7 +641,9 @@ def test_open_with_path_like_object(self):
""" Checks that we can successfully connect to a database using an object that
is PathLike, i.e. has __fspath__(). """
path = FakePath(TESTFN)
with managed_connect(path) as cx:
self.addCleanup(unlink, path)
self.assertFalse(os.path.exists(path))
with contextlib.closing(sqlite.connect(path)) as cx:
self.assertTrue(os.path.exists(path))
cx.execute(self._sql)

Expand All @@ -663,23 +653,26 @@ def test_open_with_path_like_object(self):
def test_open_with_undecodable_path(self):
path = TESTFN_UNDECODABLE
self.addCleanup(unlink, path)
with managed_connect(path, in_mem=True) as cx:
self.assertFalse(os.path.exists(path))
with contextlib.closing(sqlite.connect(path)) as cx:
self.assertTrue(os.path.exists(path))
cx.execute(self._sql)

def test_open_uri(self):
path = TESTFN
self.addCleanup(unlink, path)
uri = "file:" + urllib.parse.quote(os.fsencode(path))
self.assertFalse(os.path.exists(path))
with managed_connect(uri, uri=True) as cx:
with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
self.assertTrue(os.path.exists(path))
cx.execute(self._sql)

def test_open_unquoted_uri(self):
path = TESTFN
self.addCleanup(unlink, path)
uri = "file:" + path
self.assertFalse(os.path.exists(path))
with managed_connect(uri, uri=True) as cx:
with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
self.assertTrue(os.path.exists(path))
cx.execute(self._sql)

Expand All @@ -695,7 +688,7 @@ def test_open_uri_readonly(self):
sqlite.connect(path).close()
self.assertTrue(os.path.exists(path))
# Cannot modify new DB
with managed_connect(uri, uri=True) as cx:
with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
with self.assertRaises(sqlite.OperationalError):
cx.execute(self._sql)

Expand All @@ -704,14 +697,12 @@ def test_open_uri_readonly(self):
@unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
def test_open_undecodable_uri(self):
path = TESTFN_UNDECODABLE
self.addCleanup(unlink, path)
uri = "file:" + urllib.parse.quote(path)
self.assertFalse(os.path.exists(path))
try:
with managed_connect(uri, uri=True, in_mem=True) as cx:
self.assertTrue(os.path.exists(path))
cx.execute(self._sql)
finally:
unlink(path)
with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
self.assertTrue(os.path.exists(path))
cx.execute(self._sql)

def test_factory_database_arg(self):
def factory(database, *args, **kwargs):
Expand All @@ -722,12 +713,11 @@ def factory(database, *args, **kwargs):
for database in (TESTFN, os.fsencode(TESTFN),
FakePath(TESTFN), FakePath(os.fsencode(TESTFN))):
database_arg = None
with sqlite.connect(database, factory=factory):
pass
sqlite.connect(database, factory=factory).close()
self.assertEqual(database_arg, database)

def test_database_keyword(self):
with sqlite.connect(database=":memory:") as cx:
with contextlib.closing(sqlite.connect(database=":memory:")) as cx:
self.assertEqual(type(cx), sqlite.Connection)


Expand Down
10 changes: 5 additions & 5 deletions Lib/test/test_sqlite3/test_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from test import support
from unittest.mock import patch
from test.test_sqlite3.test_dbapi import memory_database, managed_connect, cx_limit
from test.test_sqlite3.test_dbapi import memory_database, cx_limit


class RegressionTests(unittest.TestCase):
Expand Down Expand Up @@ -422,7 +422,7 @@ def test_return_empty_bytestring(self):
self.assertEqual(val, b'')

def test_table_lock_cursor_replace_stmt(self):
with managed_connect(":memory:", in_mem=True) as con:
with memory_database() as con:
cur = con.cursor()
cur.execute("create table t(t)")
cur.executemany("insert into t values(?)",
Expand All @@ -433,7 +433,7 @@ def test_table_lock_cursor_replace_stmt(self):
con.commit()

def test_table_lock_cursor_dealloc(self):
with managed_connect(":memory:", in_mem=True) as con:
with memory_database() as con:
con.execute("create table t(t)")
con.executemany("insert into t values(?)",
((v,) for v in range(5)))
Expand All @@ -444,7 +444,7 @@ def test_table_lock_cursor_dealloc(self):
con.commit()

def test_table_lock_cursor_non_readonly_select(self):
with managed_connect(":memory:", in_mem=True) as con:
with memory_database() as con:
con.execute("create table t(t)")
con.executemany("insert into t values(?)",
((v,) for v in range(5)))
Expand All @@ -459,7 +459,7 @@ def dup(v):
con.commit()

def test_executescript_step_through_select(self):
with managed_connect(":memory:", in_mem=True) as con:
with memory_database() as con:
values = [(v,) for v in range(5)]
with con:
con.execute("create table t(t)")
Expand Down