1
1
# Test the internal _wmi module on Windows
2
2
# This is used by the platform module, and potentially others
3
3
4
+ import time
4
5
import unittest
5
- from test .support import import_helper , requires_resource
6
+ from test .support import import_helper , requires_resource , LOOPBACK_TIMEOUT
6
7
7
8
8
9
# Do this first so test will be skipped if module doesn't exist
9
10
_wmi = import_helper .import_module ('_wmi' , required_on = ['win' ])
10
11
11
12
13
+ def wmi_exec_query (query ):
14
+ # gh-112278: WMI maybe slow response when first call.
15
+ try :
16
+ return _wmi .exec_query (query )
17
+ except WindowsError as e :
18
+ if e .winerror != 258 :
19
+ raise
20
+ time .sleep (LOOPBACK_TIMEOUT )
21
+ return _wmi .exec_query (query )
22
+
23
+
12
24
class WmiTests (unittest .TestCase ):
13
25
def test_wmi_query_os_version (self ):
14
- r = _wmi . exec_query ("SELECT Version FROM Win32_OperatingSystem" ).split ("\0 " )
26
+ r = wmi_exec_query ("SELECT Version FROM Win32_OperatingSystem" ).split ("\0 " )
15
27
self .assertEqual (1 , len (r ))
16
28
k , eq , v = r [0 ].partition ("=" )
17
29
self .assertEqual ("=" , eq , r [0 ])
@@ -28,7 +40,7 @@ def test_wmi_query_repeated(self):
28
40
def test_wmi_query_error (self ):
29
41
# Invalid queries fail with OSError
30
42
try :
31
- _wmi . exec_query ("SELECT InvalidColumnName FROM InvalidTableName" )
43
+ wmi_exec_query ("SELECT InvalidColumnName FROM InvalidTableName" )
32
44
except OSError as ex :
33
45
if ex .winerror & 0xFFFFFFFF == 0x80041010 :
34
46
# This is the expected error code. All others should fail the test
@@ -42,19 +54,19 @@ def test_wmi_query_repeated_error(self):
42
54
def test_wmi_query_not_select (self ):
43
55
# Queries other than SELECT are blocked to avoid potential exploits
44
56
with self .assertRaises (ValueError ):
45
- _wmi . exec_query ("not select, just in case someone tries something" )
57
+ wmi_exec_query ("not select, just in case someone tries something" )
46
58
47
59
@requires_resource ('cpu' )
48
60
def test_wmi_query_overflow (self ):
49
61
# Ensure very big queries fail
50
62
# Test multiple times to ensure consistency
51
63
for _ in range (2 ):
52
64
with self .assertRaises (OSError ):
53
- _wmi . exec_query ("SELECT * FROM CIM_DataFile" )
65
+ wmi_exec_query ("SELECT * FROM CIM_DataFile" )
54
66
55
67
def test_wmi_query_multiple_rows (self ):
56
68
# Multiple instances should have an extra null separator
57
- r = _wmi . exec_query ("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000" )
69
+ r = wmi_exec_query ("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000" )
58
70
self .assertFalse (r .startswith ("\0 " ), r )
59
71
self .assertFalse (r .endswith ("\0 " ), r )
60
72
it = iter (r .split ("\0 " ))
@@ -69,6 +81,6 @@ def test_wmi_query_threads(self):
69
81
from concurrent .futures import ThreadPoolExecutor
70
82
query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000"
71
83
with ThreadPoolExecutor (4 ) as pool :
72
- task = [pool .submit (_wmi . exec_query , query ) for _ in range (32 )]
84
+ task = [pool .submit (wmi_exec_query , query ) for _ in range (32 )]
73
85
for t in task :
74
86
self .assertRegex (t .result (), "ProcessId=" )
0 commit comments