@@ -1278,6 +1278,7 @@ def test_ror_operator(self):
1278
1278
1279
1279
class WalkTests (unittest .TestCase ):
1280
1280
"""Tests for os.walk()."""
1281
+ is_fwalk = False
1281
1282
1282
1283
# Wrapper to hide minor differences between os.walk and os.fwalk
1283
1284
# to tests both functions with the same code base
@@ -1312,14 +1313,14 @@ def setUp(self):
1312
1313
self .sub11_path = join (self .sub1_path , "SUB11" )
1313
1314
sub2_path = join (self .walk_path , "SUB2" )
1314
1315
sub21_path = join (sub2_path , "SUB21" )
1315
- tmp1_path = join (self .walk_path , "tmp1" )
1316
+ self . tmp1_path = join (self .walk_path , "tmp1" )
1316
1317
tmp2_path = join (self .sub1_path , "tmp2" )
1317
1318
tmp3_path = join (sub2_path , "tmp3" )
1318
1319
tmp5_path = join (sub21_path , "tmp3" )
1319
1320
self .link_path = join (sub2_path , "link" )
1320
1321
t2_path = join (os_helper .TESTFN , "TEST2" )
1321
1322
tmp4_path = join (os_helper .TESTFN , "TEST2" , "tmp4" )
1322
- broken_link_path = join (sub2_path , "broken_link" )
1323
+ self . broken_link_path = join (sub2_path , "broken_link" )
1323
1324
broken_link2_path = join (sub2_path , "broken_link2" )
1324
1325
broken_link3_path = join (sub2_path , "broken_link3" )
1325
1326
@@ -1329,13 +1330,13 @@ def setUp(self):
1329
1330
os .makedirs (sub21_path )
1330
1331
os .makedirs (t2_path )
1331
1332
1332
- for path in tmp1_path , tmp2_path , tmp3_path , tmp4_path , tmp5_path :
1333
+ for path in self . tmp1_path , tmp2_path , tmp3_path , tmp4_path , tmp5_path :
1333
1334
with open (path , "x" , encoding = 'utf-8' ) as f :
1334
1335
f .write ("I'm " + path + " and proud of it. Blame test_os.\n " )
1335
1336
1336
1337
if os_helper .can_symlink ():
1337
1338
os .symlink (os .path .abspath (t2_path ), self .link_path )
1338
- os .symlink ('broken' , broken_link_path , True )
1339
+ os .symlink ('broken' , self . broken_link_path , True )
1339
1340
os .symlink (join ('tmp3' , 'broken' ), broken_link2_path , True )
1340
1341
os .symlink (join ('SUB21' , 'tmp5' ), broken_link3_path , True )
1341
1342
self .sub2_tree = (sub2_path , ["SUB21" , "link" ],
@@ -1431,6 +1432,11 @@ def test_walk_symlink(self):
1431
1432
else :
1432
1433
self .fail ("Didn't follow symlink with followlinks=True" )
1433
1434
1435
+ walk_it = self .walk (self .broken_link_path , follow_symlinks = True )
1436
+ if self .is_fwalk :
1437
+ self .assertRaises (FileNotFoundError , next , walk_it )
1438
+ self .assertRaises (StopIteration , next , walk_it )
1439
+
1434
1440
def test_walk_bad_dir (self ):
1435
1441
# Walk top-down.
1436
1442
errors = []
@@ -1452,6 +1458,73 @@ def test_walk_bad_dir(self):
1452
1458
finally :
1453
1459
os .rename (path1new , path1 )
1454
1460
1461
+ def test_walk_bad_dir2 (self ):
1462
+ walk_it = self .walk ('nonexisting' )
1463
+ if self .is_fwalk :
1464
+ self .assertRaises (FileNotFoundError , next , walk_it )
1465
+ self .assertRaises (StopIteration , next , walk_it )
1466
+
1467
+ walk_it = self .walk ('nonexisting' , follow_symlinks = True )
1468
+ if self .is_fwalk :
1469
+ self .assertRaises (FileNotFoundError , next , walk_it )
1470
+ self .assertRaises (StopIteration , next , walk_it )
1471
+
1472
+ walk_it = self .walk (self .tmp1_path )
1473
+ self .assertRaises (StopIteration , next , walk_it )
1474
+
1475
+ walk_it = self .walk (self .tmp1_path , follow_symlinks = True )
1476
+ if self .is_fwalk :
1477
+ self .assertRaises (NotADirectoryError , next , walk_it )
1478
+ self .assertRaises (StopIteration , next , walk_it )
1479
+
1480
+ @unittest .skipUnless (hasattr (os , "mkfifo" ), 'requires os.mkfifo()' )
1481
+ @unittest .skipIf (sys .platform == "vxworks" ,
1482
+ "fifo requires special path on VxWorks" )
1483
+ def test_walk_named_pipe (self ):
1484
+ path = os_helper .TESTFN + '-pipe'
1485
+ os .mkfifo (path )
1486
+ self .addCleanup (os .unlink , path )
1487
+
1488
+ walk_it = self .walk (path )
1489
+ self .assertRaises (StopIteration , next , walk_it )
1490
+
1491
+ walk_it = self .walk (path , follow_symlinks = True )
1492
+ if self .is_fwalk :
1493
+ self .assertRaises (NotADirectoryError , next , walk_it )
1494
+ self .assertRaises (StopIteration , next , walk_it )
1495
+
1496
+ @unittest .skipUnless (hasattr (os , "mkfifo" ), 'requires os.mkfifo()' )
1497
+ @unittest .skipIf (sys .platform == "vxworks" ,
1498
+ "fifo requires special path on VxWorks" )
1499
+ def test_walk_named_pipe2 (self ):
1500
+ path = os_helper .TESTFN + '-dir'
1501
+ os .mkdir (path )
1502
+ self .addCleanup (shutil .rmtree , path )
1503
+ os .mkfifo (os .path .join (path , 'mypipe' ))
1504
+
1505
+ errors = []
1506
+ walk_it = self .walk (path , onerror = errors .append )
1507
+ next (walk_it )
1508
+ self .assertRaises (StopIteration , next , walk_it )
1509
+ self .assertEqual (errors , [])
1510
+
1511
+ errors = []
1512
+ walk_it = self .walk (path , onerror = errors .append )
1513
+ root , dirs , files = next (walk_it )
1514
+ self .assertEqual (root , path )
1515
+ self .assertEqual (dirs , [])
1516
+ self .assertEqual (files , ['mypipe' ])
1517
+ dirs .extend (files )
1518
+ files .clear ()
1519
+ if self .is_fwalk :
1520
+ self .assertRaises (NotADirectoryError , next , walk_it )
1521
+ self .assertRaises (StopIteration , next , walk_it )
1522
+ if self .is_fwalk :
1523
+ self .assertEqual (errors , [])
1524
+ else :
1525
+ self .assertEqual (len (errors ), 1 , errors )
1526
+ self .assertIsInstance (errors [0 ], NotADirectoryError )
1527
+
1455
1528
def test_walk_many_open_files (self ):
1456
1529
depth = 30
1457
1530
base = os .path .join (os_helper .TESTFN , 'deep' )
@@ -1477,6 +1550,7 @@ def test_walk_many_open_files(self):
1477
1550
@unittest .skipUnless (hasattr (os , 'fwalk' ), "Test needs os.fwalk()" )
1478
1551
class FwalkTests (WalkTests ):
1479
1552
"""Tests for os.fwalk()."""
1553
+ is_fwalk = True
1480
1554
1481
1555
def walk (self , top , ** kwargs ):
1482
1556
for root , dirs , files , root_fd in self .fwalk (top , ** kwargs ):
0 commit comments