|
8 | 8 | SimpleHTTPRequestHandler
|
9 | 9 | from http import server, HTTPStatus
|
10 | 10 |
|
| 11 | +import contextlib |
11 | 12 | import os
|
12 | 13 | import socket
|
13 | 14 | import sys
|
|
20 | 21 | import html
|
21 | 22 | import http, http.client
|
22 | 23 | import urllib.parse
|
| 24 | +import urllib.request |
23 | 25 | import tempfile
|
24 | 26 | import time
|
25 | 27 | import datetime
|
|
32 | 34 | from test.support import (
|
33 | 35 | is_apple, import_helper, os_helper, threading_helper
|
34 | 36 | )
|
| 37 | +from test.support.script_helper import kill_python, spawn_python |
| 38 | +from test.support.socket_helper import find_unused_port |
35 | 39 |
|
36 | 40 | try:
|
37 | 41 | import ssl
|
@@ -1281,6 +1285,256 @@ def test_server_test_ipv4(self, _):
|
1281 | 1285 | self.assertEqual(mock_server.address_family, socket.AF_INET)
|
1282 | 1286 |
|
1283 | 1287 |
|
| 1288 | +class CommandLineTestCase(unittest.TestCase): |
| 1289 | + default_port = 8000 |
| 1290 | + default_bind = None |
| 1291 | + default_protocol = 'HTTP/1.0' |
| 1292 | + default_handler = SimpleHTTPRequestHandler |
| 1293 | + default_server = unittest.mock.ANY |
| 1294 | + tls_cert = certdata_file('ssl_cert.pem') |
| 1295 | + tls_key = certdata_file('ssl_key.pem') |
| 1296 | + tls_password = 'somepass' |
| 1297 | + tls_cert_options = ['--tls-cert'] |
| 1298 | + tls_key_options = ['--tls-key'] |
| 1299 | + tls_password_options = ['--tls-password-file'] |
| 1300 | + args = { |
| 1301 | + 'HandlerClass': default_handler, |
| 1302 | + 'ServerClass': default_server, |
| 1303 | + 'protocol': default_protocol, |
| 1304 | + 'port': default_port, |
| 1305 | + 'bind': default_bind, |
| 1306 | + 'tls_cert': None, |
| 1307 | + 'tls_key': None, |
| 1308 | + 'tls_password': None, |
| 1309 | + } |
| 1310 | + |
| 1311 | + def setUp(self): |
| 1312 | + super().setUp() |
| 1313 | + self.tls_password_file = tempfile.mktemp() |
| 1314 | + with open(self.tls_password_file, 'wb') as f: |
| 1315 | + f.write(self.tls_password.encode()) |
| 1316 | + self.addCleanup(os_helper.unlink, self.tls_password_file) |
| 1317 | + |
| 1318 | + def invoke_httpd(self, *args, stdout=None, stderr=None): |
| 1319 | + stdout = StringIO() if stdout is None else stdout |
| 1320 | + stderr = StringIO() if stderr is None else stderr |
| 1321 | + with contextlib.redirect_stdout(stdout), \ |
| 1322 | + contextlib.redirect_stderr(stderr): |
| 1323 | + server._main(args) |
| 1324 | + return stdout.getvalue(), stderr.getvalue() |
| 1325 | + |
| 1326 | + @mock.patch('http.server.test') |
| 1327 | + def test_port_flag(self, mock_func): |
| 1328 | + ports = [8000, 65535] |
| 1329 | + for port in ports: |
| 1330 | + with self.subTest(port=port): |
| 1331 | + self.invoke_httpd(str(port)) |
| 1332 | + call_args = self.args | dict(port=port) |
| 1333 | + mock_func.assert_called_once_with(**call_args) |
| 1334 | + mock_func.reset_mock() |
| 1335 | + |
| 1336 | + @mock.patch('http.server.test') |
| 1337 | + def test_directory_flag(self, mock_func): |
| 1338 | + options = ['-d', '--directory'] |
| 1339 | + directories = ['.', '/foo', '\\bar', '/', |
| 1340 | + 'C:\\', 'C:\\foo', 'C:\\bar', |
| 1341 | + '/home/user', './foo/foo2', 'D:\\foo\\bar'] |
| 1342 | + for flag in options: |
| 1343 | + for directory in directories: |
| 1344 | + with self.subTest(flag=flag, directory=directory): |
| 1345 | + self.invoke_httpd(flag, directory) |
| 1346 | + mock_func.assert_called_once_with(**self.args) |
| 1347 | + mock_func.reset_mock() |
| 1348 | + |
| 1349 | + @mock.patch('http.server.test') |
| 1350 | + def test_bind_flag(self, mock_func): |
| 1351 | + options = ['-b', '--bind'] |
| 1352 | + bind_addresses = ['localhost', '127.0.0.1', '::1', |
| 1353 | + '0.0.0.0', '8.8.8.8'] |
| 1354 | + for flag in options: |
| 1355 | + for bind_address in bind_addresses: |
| 1356 | + with self.subTest(flag=flag, bind_address=bind_address): |
| 1357 | + self.invoke_httpd(flag, bind_address) |
| 1358 | + call_args = self.args | dict(bind=bind_address) |
| 1359 | + mock_func.assert_called_once_with(**call_args) |
| 1360 | + mock_func.reset_mock() |
| 1361 | + |
| 1362 | + @mock.patch('http.server.test') |
| 1363 | + def test_protocol_flag(self, mock_func): |
| 1364 | + options = ['-p', '--protocol'] |
| 1365 | + protocols = ['HTTP/1.0', 'HTTP/1.1', 'HTTP/2.0', 'HTTP/3.0'] |
| 1366 | + for flag in options: |
| 1367 | + for protocol in protocols: |
| 1368 | + with self.subTest(flag=flag, protocol=protocol): |
| 1369 | + self.invoke_httpd(flag, protocol) |
| 1370 | + call_args = self.args | dict(protocol=protocol) |
| 1371 | + mock_func.assert_called_once_with(**call_args) |
| 1372 | + mock_func.reset_mock() |
| 1373 | + |
| 1374 | + @unittest.skipIf(ssl is None, "requires ssl") |
| 1375 | + @mock.patch('http.server.test') |
| 1376 | + def test_tls_cert_and_key_flags(self, mock_func): |
| 1377 | + for tls_cert_option in self.tls_cert_options: |
| 1378 | + for tls_key_option in self.tls_key_options: |
| 1379 | + self.invoke_httpd(tls_cert_option, self.tls_cert, |
| 1380 | + tls_key_option, self.tls_key) |
| 1381 | + call_args = self.args | { |
| 1382 | + 'tls_cert': self.tls_cert, |
| 1383 | + 'tls_key': self.tls_key, |
| 1384 | + } |
| 1385 | + mock_func.assert_called_once_with(**call_args) |
| 1386 | + mock_func.reset_mock() |
| 1387 | + |
| 1388 | + @unittest.skipIf(ssl is None, "requires ssl") |
| 1389 | + @mock.patch('http.server.test') |
| 1390 | + def test_tls_cert_and_key_and_password_flags(self, mock_func): |
| 1391 | + for tls_cert_option in self.tls_cert_options: |
| 1392 | + for tls_key_option in self.tls_key_options: |
| 1393 | + for tls_password_option in self.tls_password_options: |
| 1394 | + self.invoke_httpd(tls_cert_option, |
| 1395 | + self.tls_cert, |
| 1396 | + tls_key_option, |
| 1397 | + self.tls_key, |
| 1398 | + tls_password_option, |
| 1399 | + self.tls_password_file) |
| 1400 | + call_args = self.args | { |
| 1401 | + 'tls_cert': self.tls_cert, |
| 1402 | + 'tls_key': self.tls_key, |
| 1403 | + 'tls_password': self.tls_password, |
| 1404 | + } |
| 1405 | + mock_func.assert_called_once_with(**call_args) |
| 1406 | + mock_func.reset_mock() |
| 1407 | + |
| 1408 | + @unittest.skipIf(ssl is None, "requires ssl") |
| 1409 | + @mock.patch('http.server.test') |
| 1410 | + def test_missing_tls_cert_flag(self, mock_func): |
| 1411 | + for tls_key_option in self.tls_key_options: |
| 1412 | + with self.assertRaises(SystemExit): |
| 1413 | + self.invoke_httpd(tls_key_option, self.tls_key) |
| 1414 | + mock_func.reset_mock() |
| 1415 | + |
| 1416 | + for tls_password_option in self.tls_password_options: |
| 1417 | + with self.assertRaises(SystemExit): |
| 1418 | + self.invoke_httpd(tls_password_option, self.tls_password) |
| 1419 | + mock_func.reset_mock() |
| 1420 | + |
| 1421 | + @unittest.skipIf(ssl is None, "requires ssl") |
| 1422 | + @mock.patch('http.server.test') |
| 1423 | + def test_invalid_password_file(self, mock_func): |
| 1424 | + non_existent_file = 'non_existent_file' |
| 1425 | + for tls_password_option in self.tls_password_options: |
| 1426 | + for tls_cert_option in self.tls_cert_options: |
| 1427 | + with self.assertRaises(SystemExit): |
| 1428 | + self.invoke_httpd(tls_cert_option, |
| 1429 | + self.tls_cert, |
| 1430 | + tls_password_option, |
| 1431 | + non_existent_file) |
| 1432 | + |
| 1433 | + @mock.patch('http.server.test') |
| 1434 | + def test_no_arguments(self, mock_func): |
| 1435 | + self.invoke_httpd() |
| 1436 | + mock_func.assert_called_once_with(**self.args) |
| 1437 | + mock_func.reset_mock() |
| 1438 | + |
| 1439 | + @mock.patch('http.server.test') |
| 1440 | + def test_help_flag(self, _): |
| 1441 | + options = ['-h', '--help'] |
| 1442 | + for option in options: |
| 1443 | + stdout, stderr = StringIO(), StringIO() |
| 1444 | + with self.assertRaises(SystemExit): |
| 1445 | + self.invoke_httpd(option, stdout=stdout, stderr=stderr) |
| 1446 | + self.assertIn('usage', stdout.getvalue()) |
| 1447 | + self.assertEqual(stderr.getvalue(), '') |
| 1448 | + |
| 1449 | + @mock.patch('http.server.test') |
| 1450 | + def test_unknown_flag(self, _): |
| 1451 | + stdout, stderr = StringIO(), StringIO() |
| 1452 | + with self.assertRaises(SystemExit): |
| 1453 | + self.invoke_httpd('--unknown-flag', stdout=stdout, stderr=stderr) |
| 1454 | + self.assertEqual(stdout.getvalue(), '') |
| 1455 | + self.assertIn('error', stderr.getvalue()) |
| 1456 | + |
| 1457 | + |
| 1458 | +class CommandLineRunTimeTestCase(unittest.TestCase): |
| 1459 | + served_data = os.urandom(32) |
| 1460 | + served_file_name = 'served_filename' |
| 1461 | + tls_cert = certdata_file('ssl_cert.pem') |
| 1462 | + tls_key = certdata_file('ssl_key.pem') |
| 1463 | + tls_password = 'somepass' |
| 1464 | + |
| 1465 | + def setUp(self): |
| 1466 | + super().setUp() |
| 1467 | + with open(self.served_file_name, 'wb') as f: |
| 1468 | + f.write(self.served_data) |
| 1469 | + self.addCleanup(os_helper.unlink, self.served_file_name) |
| 1470 | + self.tls_password_file = tempfile.mktemp() |
| 1471 | + with open(self.tls_password_file, 'wb') as f: |
| 1472 | + f.write(self.tls_password.encode()) |
| 1473 | + self.addCleanup(os_helper.unlink, self.tls_password_file) |
| 1474 | + |
| 1475 | + def fetch_file(self, path): |
| 1476 | + context = ssl.create_default_context() |
| 1477 | + # allow self-signed certificates |
| 1478 | + context.check_hostname = False |
| 1479 | + context.verify_mode = ssl.CERT_NONE |
| 1480 | + req = urllib.request.Request(path, method='GET') |
| 1481 | + with urllib.request.urlopen(req, context=context) as res: |
| 1482 | + return res.read() |
| 1483 | + |
| 1484 | + def parse_cli_output(self, output): |
| 1485 | + matches = re.search(r'\((https?)://([^/:]+):(\d+)/?\)', output) |
| 1486 | + if matches is None: |
| 1487 | + return None, None, None |
| 1488 | + return matches.group(1), matches.group(2), int(matches.group(3)) |
| 1489 | + |
| 1490 | + def wait_for_server(self, proc, protocol, port, bind, timeout=50): |
| 1491 | + """Check the server process output. |
| 1492 | +
|
| 1493 | + Return True if the server was successfully started |
| 1494 | + and is listening on the given port and bind address. |
| 1495 | + """ |
| 1496 | + while timeout > 0: |
| 1497 | + line = proc.stdout.readline() |
| 1498 | + if not line: |
| 1499 | + time.sleep(0.1) |
| 1500 | + timeout -= 1 |
| 1501 | + continue |
| 1502 | + protocol_, host_, port_ = self.parse_cli_output(line) |
| 1503 | + if not protocol_ or not host_ or not port_: |
| 1504 | + time.sleep(0.1) |
| 1505 | + timeout -= 1 |
| 1506 | + continue |
| 1507 | + if protocol_ == protocol and host_ == bind and port_ == port: |
| 1508 | + return True |
| 1509 | + break |
| 1510 | + return False |
| 1511 | + |
| 1512 | + def test_http_client(self): |
| 1513 | + port = find_unused_port() |
| 1514 | + bind = '127.0.0.1' |
| 1515 | + proc = spawn_python('-u', '-m', 'http.server', str(port), '-b', bind, |
| 1516 | + bufsize=1, text=True) |
| 1517 | + self.addCleanup(kill_python, proc) |
| 1518 | + self.addCleanup(proc.terminate) |
| 1519 | + self.assertTrue(self.wait_for_server(proc, 'http', port, bind)) |
| 1520 | + res = self.fetch_file(f'http://{bind}:{port}/{self.served_file_name}') |
| 1521 | + self.assertEqual(res, self.served_data) |
| 1522 | + |
| 1523 | + def test_https_client(self): |
| 1524 | + port = find_unused_port() |
| 1525 | + bind = '127.0.0.1' |
| 1526 | + proc = spawn_python('-u', '-m', 'http.server', str(port), '-b', bind, |
| 1527 | + '--tls-cert', self.tls_cert, |
| 1528 | + '--tls-key', self.tls_key, |
| 1529 | + '--tls-password-file', self.tls_password_file, |
| 1530 | + bufsize=1, text=True) |
| 1531 | + self.addCleanup(kill_python, proc) |
| 1532 | + self.addCleanup(proc.terminate) |
| 1533 | + self.assertTrue(self.wait_for_server(proc, 'https', port, bind)) |
| 1534 | + res = self.fetch_file(f'https://{bind}:{port}/{self.served_file_name}') |
| 1535 | + self.assertEqual(res, self.served_data) |
| 1536 | + |
| 1537 | + |
1284 | 1538 | def setUpModule():
|
1285 | 1539 | unittest.addModuleCleanup(os.chdir, os.getcwd())
|
1286 | 1540 |
|
|
0 commit comments