forked from pymssql/pymssql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_green.py
134 lines (105 loc) · 4.16 KB
/
test_green.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import datetime
import pytest
import unittest
import pymssql
from .helpers import mssqlconn, pymssqlconn, mark_slow
gevent = pytest.importorskip("gevent")
import gevent.socket
class GreenletTests(unittest.TestCase):
def greenlet_run_pymssql_execute(self, num):
with pymssqlconn() as conn:
cur = conn.cursor()
cur.execute("""
WAITFOR DELAY '00:00:05' -- sleep for 5 seconds
SELECT CURRENT_TIMESTAMP
""")
row = cur.fetchone()
print("greenlet_run_pymssql_execute: num = %r; row = %r" % (num, row))
def greenlet_run_pymssql_callproc(self, num):
with pymssqlconn() as conn:
cur = conn.cursor()
proc_name = 'my_proc'
# print("~~ Checking for stored proc (num=%r)..." % num)
# cur.execute("IF OBJECT_ID('%s', 'P') IS NOT NULL DROP PROCEDURE %s" % (proc_name, proc_name))
# print("~~ Creating stored proc (num=%r)..." % num)
# cur.execute("""
# CREATE PROCEDURE %s AS
# BEGIN
# SET NOCOUNT ON
# WAITFOR DELAY '00:00:05' -- sleep for 5 seconds
# SELECT CURRENT_TIMESTAMP
# END
# """ % (proc_name,))
# print("~~ Calling stored proc (num=%r)..." % num)
cur.callproc(proc_name, ())
# print("~~ callproc returned (num=%r)..." % num)
cur.nextset()
row = cur.fetchone()
print("greenlet_run_pymssql_callproc: num = %r; row = %r" % (num, row))
cur.close()
def greenlet_run_mssql_execute(self, num):
conn = mssqlconn()
conn.execute_query("""
WAITFOR DELAY '00:00:05' -- sleep for 5 seconds
SELECT CURRENT_TIMESTAMP
""")
for row in conn:
print("greenlet_run_mssql_execute: num = %r; row = %r" % (num, row))
pass
conn.close()
def _run_all_greenlets(self, greenlet_task):
greenlets = []
dt1 = datetime.datetime.now()
for i in range(5):
gevent.sleep(1)
greenlets.append(gevent.spawn(greenlet_task, i))
gevent.joinall(greenlets)
dt2 = datetime.datetime.now()
return dt2 - dt1
@mark_slow
def test_gevent_socket_pymssql_execute_wait_read_concurrency(self):
def wait_callback(read_fileno):
gevent.socket.wait_read(read_fileno)
pymssql.set_wait_callback(wait_callback)
elapsed_time = self._run_all_greenlets(
self.greenlet_run_pymssql_execute)
self.assertTrue(
elapsed_time < datetime.timedelta(seconds=20),
'elapsed_time < 20 seconds')
@mark_slow
def test_gevent_socket_pymssql_callproc_wait_read_concurrency(self):
def wait_callback(read_fileno):
gevent.socket.wait_read(read_fileno)
pymssql.set_wait_callback(wait_callback)
with pymssqlconn() as conn:
cur = conn.cursor()
proc_name = 'my_proc'
cur.execute("IF OBJECT_ID('%s', 'P') IS NOT NULL DROP PROCEDURE %s" % (proc_name, proc_name))
cur.execute("""
CREATE PROCEDURE %s AS
BEGIN
SET NOCOUNT ON
WAITFOR DELAY '00:00:05' -- sleep for 5 seconds
SELECT CURRENT_TIMESTAMP
END
""" % (proc_name,))
conn.commit()
elapsed_time = self._run_all_greenlets(
self.greenlet_run_pymssql_callproc)
self.assertTrue(
elapsed_time < datetime.timedelta(seconds=20),
'elapsed_time < 20 seconds')
@mark_slow
def test_gevent_socket_mssql_execute_wait_read_concurrency(self):
def wait_callback(read_fileno):
gevent.socket.wait_read(read_fileno)
pymssql.set_wait_callback(wait_callback)
elapsed_time = self._run_all_greenlets(
self.greenlet_run_mssql_execute)
self.assertTrue(
elapsed_time < datetime.timedelta(seconds=20),
'elapsed_time < 20 seconds')
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(GreenletTests))
if __name__ == '__main__':
unittest.main()