test_os.py 35.2 KB
Newer Older
1 2
# As a test suite for the os module, this is woefully inadequate, but this
# does add tests for a few functions which have been determined to be more
3
# portable than they had been thought to be.
4 5

import os
Benjamin Peterson's avatar
Benjamin Peterson committed
6
import errno
7
import unittest
8
import warnings
9
import sys
10 11 12
import signal
import subprocess
import time
13
import shutil
14
from test import support
15

16 17 18 19
# Detect whether we're on a Linux system that uses the (now outdated
# and unmaintained) linuxthreads threading library.  There's an issue
# when combining linuxthreads with a failed execv call: see
# http://bugs.python.org/issue4970.
20 21
if (hasattr(os, "confstr_names") and
    "CS_GNU_LIBPTHREAD_VERSION" in os.confstr_names):
22 23 24 25
    libpthread = os.confstr("CS_GNU_LIBPTHREAD_VERSION")
    USING_LINUXTHREADS= libpthread.startswith("linuxthreads")
else:
    USING_LINUXTHREADS= False
26

27 28 29
# Tests creating TESTFN
class FileTests(unittest.TestCase):
    def setUp(self):
30 31
        if os.path.exists(support.TESTFN):
            os.unlink(support.TESTFN)
32 33 34
    tearDown = setUp

    def test_access(self):
35
        f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
36
        os.close(f)
37
        self.assertTrue(os.access(support.TESTFN, os.W_OK))
38

39
    def test_closerange(self):
40 41 42 43 44 45 46 47 48 49 50 51
        first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
        # We must allocate two consecutive file descriptors, otherwise
        # it will mess up other file descriptors (perhaps even the three
        # standard ones).
        second = os.dup(first)
        try:
            retries = 0
            while second != first + 1:
                os.close(first)
                retries += 1
                if retries > 10:
                    # XXX test skipped
Benjamin Peterson's avatar
Benjamin Peterson committed
52
                    self.skipTest("couldn't allocate two consecutive fds")
53 54 55
                first, second = second, os.dup(second)
        finally:
            os.close(second)
56
        # close a fd that is open, and one that isn't
57
        os.closerange(first, first + 2)
58
        self.assertRaises(OSError, os.write, first, b"a")
59

60 61 62 63 64 65 66
    def test_rename(self):
        path = support.TESTFN
        old = sys.getrefcount(path)
        self.assertRaises(TypeError, os.rename, path, 0)
        new = sys.getrefcount(path)
        self.assertEqual(old, new)

67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
    def test_read(self):
        with open(support.TESTFN, "w+b") as fobj:
            fobj.write(b"spam")
            fobj.flush()
            fd = fobj.fileno()
            os.lseek(fd, 0, 0)
            s = os.read(fd, 4)
            self.assertEqual(type(s), bytes)
            self.assertEqual(s, b"spam")

    def test_write(self):
        # os.write() accepts bytes- and buffer-like objects but not strings
        fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
        self.assertRaises(TypeError, os.write, fd, "beans")
        os.write(fd, b"bacon\n")
        os.write(fd, bytearray(b"eggs\n"))
        os.write(fd, memoryview(b"spam\n"))
        os.close(fd)
        with open(support.TESTFN, "rb") as fobj:
86 87
            self.assertEqual(fobj.read().splitlines(),
                [b"bacon", b"eggs", b"spam"])
88 89


Christian Heimes's avatar
Christian Heimes committed
90 91 92
class TemporaryFileTests(unittest.TestCase):
    def setUp(self):
        self.files = []
93
        os.mkdir(support.TESTFN)
Christian Heimes's avatar
Christian Heimes committed
94 95 96 97

    def tearDown(self):
        for name in self.files:
            os.unlink(name)
98
        os.rmdir(support.TESTFN)
Christian Heimes's avatar
Christian Heimes committed
99 100 101

    def check_tempfile(self, name):
        # make sure it doesn't already exist:
102
        self.assertFalse(os.path.exists(name),
Christian Heimes's avatar
Christian Heimes committed
103 104 105 106 107 108 109 110 111 112 113 114
                    "file already exists for temporary file")
        # make sure we can create the file
        open(name, "w")
        self.files.append(name)

    def test_tempnam(self):
        if not hasattr(os, "tempnam"):
            return
        warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
                                r"test_os$")
        self.check_tempfile(os.tempnam())

115
        name = os.tempnam(support.TESTFN)
Christian Heimes's avatar
Christian Heimes committed
116 117
        self.check_tempfile(name)

118
        name = os.tempnam(support.TESTFN, "pfx")
119
        self.assertTrue(os.path.basename(name)[:3] == "pfx")
Christian Heimes's avatar
Christian Heimes committed
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
        self.check_tempfile(name)

    def test_tmpfile(self):
        if not hasattr(os, "tmpfile"):
            return
        # As with test_tmpnam() below, the Windows implementation of tmpfile()
        # attempts to create a file in the root directory of the current drive.
        # On Vista and Server 2008, this test will always fail for normal users
        # as writing to the root directory requires elevated privileges.  With
        # XP and below, the semantics of tmpfile() are the same, but the user
        # running the test is more likely to have administrative privileges on
        # their account already.  If that's the case, then os.tmpfile() should
        # work.  In order to make this test as useful as possible, rather than
        # trying to detect Windows versions or whether or not the user has the
        # right permissions, just try and create a file in the root directory
        # and see if it raises a 'Permission denied' OSError.  If it does, then
        # test that a subsequent call to os.tmpfile() raises the same error. If
        # it doesn't, assume we're on XP or below and the user running the test
        # has administrative privileges, and proceed with the test as normal.
        if sys.platform == 'win32':
            name = '\\python_test_os_test_tmpfile.txt'
            if os.path.exists(name):
                os.remove(name)
            try:
                fp = open(name, 'w')
            except IOError as first:
                # open() failed, assert tmpfile() fails in the same way.
                # Although open() raises an IOError and os.tmpfile() raises an
                # OSError(), 'args' will be (13, 'Permission denied') in both
                # cases.
                try:
                    fp = os.tmpfile()
                except OSError as second:
                    self.assertEqual(first.args, second.args)
                else:
                    self.fail("expected os.tmpfile() to raise OSError")
                return
            else:
                # open() worked, therefore, tmpfile() should work.  Close our
                # dummy file and proceed with the test as normal.
                fp.close()
                os.remove(name)

        fp = os.tmpfile()
        fp.write("foobar")
        fp.seek(0,0)
        s = fp.read()
        fp.close()
168
        self.assertTrue(s == "foobar")
Christian Heimes's avatar
Christian Heimes committed
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191

    def test_tmpnam(self):
        if not hasattr(os, "tmpnam"):
            return
        warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
                                r"test_os$")
        name = os.tmpnam()
        if sys.platform in ("win32",):
            # The Windows tmpnam() seems useless.  From the MS docs:
            #
            #     The character string that tmpnam creates consists of
            #     the path prefix, defined by the entry P_tmpdir in the
            #     file STDIO.H, followed by a sequence consisting of the
            #     digit characters '0' through '9'; the numerical value
            #     of this string is in the range 1 - 65,535.  Changing the
            #     definitions of L_tmpnam or P_tmpdir in STDIO.H does not
            #     change the operation of tmpnam.
            #
            # The really bizarre part is that, at least under MSVC6,
            # P_tmpdir is "\\".  That is, the path returned refers to
            # the root of the current drive.  That's a terrible place to
            # put temp files, and, depending on privileges, the user
            # may not even be able to open a file in the root directory.
192
            self.assertFalse(os.path.exists(name),
Christian Heimes's avatar
Christian Heimes committed
193 194 195 196
                        "file already exists for temporary file")
        else:
            self.check_tempfile(name)

197 198 199 200 201 202 203 204 205 206
    def fdopen_helper(self, *args):
        fd = os.open(support.TESTFN, os.O_RDONLY)
        fp2 = os.fdopen(fd, *args)
        fp2.close()

    def test_fdopen(self):
        self.fdopen_helper()
        self.fdopen_helper('r')
        self.fdopen_helper('r', 100)

207 208 209
# Test attributes on return values from os.*stat* family.
class StatAttributeTests(unittest.TestCase):
    def setUp(self):
210 211
        os.mkdir(support.TESTFN)
        self.fname = os.path.join(support.TESTFN, "f1")
212
        f = open(self.fname, 'wb')
Guido van Rossum's avatar
Guido van Rossum committed
213
        f.write(b"ABC")
214
        f.close()
Tim Peters's avatar
Tim Peters committed
215

216 217
    def tearDown(self):
        os.unlink(self.fname)
218
        os.rmdir(support.TESTFN)
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235

    def test_stat_attributes(self):
        if not hasattr(os, "stat"):
            return

        import stat
        result = os.stat(self.fname)

        # Make sure direct access works
        self.assertEquals(result[stat.ST_SIZE], 3)
        self.assertEquals(result.st_size, 3)

        # Make sure all the attributes are there
        members = dir(result)
        for name in dir(stat):
            if name[:3] == 'ST_':
                attr = name.lower()
236 237 238 239 240
                if name.endswith("TIME"):
                    def trunc(x): return int(x)
                else:
                    def trunc(x): return x
                self.assertEquals(trunc(getattr(result, attr)),
241
                                  result[getattr(stat, name)])
242
                self.assertIn(attr, members)
243 244 245 246 247 248 249 250 251 252 253

        try:
            result[200]
            self.fail("No exception thrown")
        except IndexError:
            pass

        # Make sure that assignment fails
        try:
            result.st_mode = 1
            self.fail("No exception thrown")
254
        except AttributeError:
255 256 257 258 259
            pass

        try:
            result.st_rdev = 1
            self.fail("No exception thrown")
260
        except (AttributeError, TypeError):
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
            pass

        try:
            result.parrot = 1
            self.fail("No exception thrown")
        except AttributeError:
            pass

        # Use the stat_result constructor with a too-short tuple.
        try:
            result2 = os.stat_result((10,))
            self.fail("No exception thrown")
        except TypeError:
            pass

        # Use the constructr with a too-long tuple.
        try:
            result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
        except TypeError:
            pass

Tim Peters's avatar
Tim Peters committed
282

283 284 285 286
    def test_statvfs_attributes(self):
        if not hasattr(os, "statvfs"):
            return

287 288
        try:
            result = os.statvfs(self.fname)
289
        except OSError as e:
290 291 292
            # On AtheOS, glibc always returns ENOSYS
            if e.errno == errno.ENOSYS:
                return
293 294

        # Make sure direct access works
295
        self.assertEquals(result.f_bfree, result[3])
296

297 298 299 300 301
        # Make sure all the attributes are there.
        members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
                    'ffree', 'favail', 'flag', 'namemax')
        for value, member in enumerate(members):
            self.assertEquals(getattr(result, 'f_' + member), result[value])
302 303 304 305 306

        # Make sure that assignment really fails
        try:
            result.f_bfree = 1
            self.fail("No exception thrown")
307
        except AttributeError:
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
            pass

        try:
            result.parrot = 1
            self.fail("No exception thrown")
        except AttributeError:
            pass

        # Use the constructor with a too-short tuple.
        try:
            result2 = os.statvfs_result((10,))
            self.fail("No exception thrown")
        except TypeError:
            pass

        # Use the constructr with a too-long tuple.
        try:
            result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
        except TypeError:
            pass
328

329 330
    def test_utime_dir(self):
        delta = 1000000
331
        st = os.stat(support.TESTFN)
332 333
        # round to int, because some systems may support sub-second
        # time stamps in stat, but not in utime.
334 335
        os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
        st2 = os.stat(support.TESTFN)
336 337 338 339 340
        self.assertEquals(st2.st_mtime, int(st.st_mtime-delta))

    # Restrict test to Win32, since there is no guarantee other
    # systems support centiseconds
    if sys.platform == 'win32':
341
        def get_file_system(path):
342
            root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
343
            import ctypes
344
            kernel32 = ctypes.windll.kernel32
345
            buf = ctypes.create_unicode_buffer("", 100)
346
            if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
347 348
                return buf.value

349
        if get_file_system(support.TESTFN) == "NTFS":
350 351 352 353
            def test_1565150(self):
                t1 = 1159195039.25
                os.utime(self.fname, (t1, t1))
                self.assertEquals(os.stat(self.fname).st_mtime, t1)
354

355 356 357 358 359
        def test_1686475(self):
            # Verify that an open file can be stat'ed
            try:
                os.stat(r"c:\pagefile.sys")
            except WindowsError as e:
360
                if e.errno == 2: # file does not exist; cannot run test
361 362 363
                    return
                self.fail("Could not stat pagefile.sys")

364
from test import mapping_tests
365

366
class EnvironTests(mapping_tests.BasicTestMappingProtocol):
367
    """check that os.environ object conform to mapping protocol"""
368
    type2test = None
369

370 371
    def setUp(self):
        self.__save = dict(os.environ)
372 373
        if os.name not in ('os2', 'nt'):
            self.__saveb = dict(os.environb)
374 375 376
        for key, value in self._reference().items():
            os.environ[key] = value

377 378 379
    def tearDown(self):
        os.environ.clear()
        os.environ.update(self.__save)
380 381 382
        if os.name not in ('os2', 'nt'):
            os.environb.clear()
            os.environb.update(self.__saveb)
383

384 385 386 387 388 389 390
    def _reference(self):
        return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}

    def _empty_mapping(self):
        os.environ.clear()
        return os.environ

391
    # Bug 1110478
392
    def test_update2(self):
393
        os.environ.clear()
394 395 396 397 398
        if os.path.exists("/bin/sh"):
            os.environ.update(HELLO="World")
            value = os.popen("/bin/sh -c 'echo $HELLO'").read().strip()
            self.assertEquals(value, "World")

399 400 401 402 403 404 405 406 407
    def test_os_popen_iter(self):
        if os.path.exists("/bin/sh"):
            popen = os.popen("/bin/sh -c 'echo \"line1\nline2\nline3\"'")
            it = iter(popen)
            self.assertEquals(next(it), "line1\n")
            self.assertEquals(next(it), "line2\n")
            self.assertEquals(next(it), "line3\n")
            self.assertRaises(StopIteration, next, it)

408 409 410 411 412 413 414
    # Verify environ keys and values from the OS are of the
    # correct str type.
    def test_keyvalue_types(self):
        for key, val in os.environ.items():
            self.assertEquals(type(key), str)
            self.assertEquals(type(val), str)

415 416 417 418
    def test_items(self):
        for key, value in self._reference().items():
            self.assertEqual(os.environ.get(key), value)

419 420 421 422 423 424 425
    # Issue 7310
    def test___repr__(self):
        """Check that the repr() of os.environ looks like environ({...})."""
        env = os.environ
        self.assertTrue(isinstance(env.data, dict))
        self.assertEqual(repr(env), 'environ({!r})'.format(env.data))

426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
    def test_get_exec_path(self):
        defpath_list = os.defpath.split(os.pathsep)
        test_path = ['/monty', '/python', '', '/flying/circus']
        test_env = {'PATH': os.pathsep.join(test_path)}

        saved_environ = os.environ
        try:
            os.environ = dict(test_env)
            # Test that defaulting to os.environ works.
            self.assertSequenceEqual(test_path, os.get_exec_path())
            self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
        finally:
            os.environ = saved_environ

        # No PATH environment variable
        self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
        # Empty PATH environment variable
        self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
        # Supplied PATH environment variable
        self.assertSequenceEqual(test_path, os.get_exec_path(test_env))

447 448 449 450 451
    @unittest.skipIf(sys.platform == "win32", "POSIX specific test")
    def test_environb(self):
        # os.environ -> os.environb
        value = 'euro\u20ac'
        try:
Benjamin Peterson's avatar
Benjamin Peterson committed
452 453
            value_bytes = value.encode(sys.getfilesystemencoding(),
                                       'surrogateescape')
454
        except UnicodeEncodeError:
Benjamin Peterson's avatar
Benjamin Peterson committed
455 456
            msg = "U+20AC character is not encodable to %s" % (
                sys.getfilesystemencoding(),)
457
            self.skipTest(msg)
458 459 460 461 462 463 464 465 466 467
        os.environ['unicode'] = value
        self.assertEquals(os.environ['unicode'], value)
        self.assertEquals(os.environb[b'unicode'], value_bytes)

        # os.environb -> os.environ
        value = b'\xff'
        os.environb[b'bytes'] = value
        self.assertEquals(os.environb[b'bytes'], value)
        value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
        self.assertEquals(os.environ['bytes'], value_str)
468

469 470 471 472 473 474 475 476
class WalkTests(unittest.TestCase):
    """Tests for os.walk()."""

    def test_traversal(self):
        import os
        from os.path import join

        # Build:
477 478
        #     TESTFN/
        #       TEST1/              a file kid and two directory kids
479 480
        #         tmp1
        #         SUB1/             a file kid and a directory kid
481 482 483 484 485 486 487
        #           tmp2
        #           SUB11/          no kids
        #         SUB2/             a file kid and a dirsymlink kid
        #           tmp3
        #           link/           a symlink to TESTFN.2
        #       TEST2/
        #         tmp4              a lone file
488
        walk_path = join(support.TESTFN, "TEST1")
489
        sub1_path = join(walk_path, "SUB1")
490
        sub11_path = join(sub1_path, "SUB11")
491 492
        sub2_path = join(walk_path, "SUB2")
        tmp1_path = join(walk_path, "tmp1")
493 494
        tmp2_path = join(sub1_path, "tmp2")
        tmp3_path = join(sub2_path, "tmp3")
495
        link_path = join(sub2_path, "link")
496 497
        t2_path = join(support.TESTFN, "TEST2")
        tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
498 499 500 501

        # Create stuff.
        os.makedirs(sub11_path)
        os.makedirs(sub2_path)
502 503
        os.makedirs(t2_path)
        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
504
            f = open(path, "w")
505 506
            f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
            f.close()
507 508 509 510 511
        if hasattr(os, "symlink"):
            os.symlink(os.path.abspath(t2_path), link_path)
            sub2_tree = (sub2_path, ["link"], ["tmp3"])
        else:
            sub2_tree = (sub2_path, [], ["tmp3"])
512 513

        # Walk top-down.
514
        all = list(os.walk(walk_path))
515 516 517 518 519 520
        self.assertEqual(len(all), 4)
        # We can't know which order SUB1 and SUB2 will appear in.
        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
        #     flipped:  TESTFN, SUB2, SUB1, SUB11
        flipped = all[0][1][0] != "SUB1"
        all[0][1].sort()
521
        self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
522 523
        self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
        self.assertEqual(all[2 + flipped], (sub11_path, [], []))
524
        self.assertEqual(all[3 - 2 * flipped], sub2_tree)
525 526 527

        # Prune the search.
        all = []
528
        for root, dirs, files in os.walk(walk_path):
529 530 531 532 533 534
            all.append((root, dirs, files))
            # Don't descend into SUB1.
            if 'SUB1' in dirs:
                # Note that this also mutates the dirs we appended to all!
                dirs.remove('SUB1')
        self.assertEqual(len(all), 2)
535 536
        self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
        self.assertEqual(all[1], sub2_tree)
537 538

        # Walk bottom-up.
539
        all = list(os.walk(walk_path, topdown=False))
540 541 542 543 544 545
        self.assertEqual(len(all), 4)
        # We can't know which order SUB1 and SUB2 will appear in.
        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
        #     flipped:  SUB2, SUB11, SUB1, TESTFN
        flipped = all[3][1][0] != "SUB1"
        all[3][1].sort()
546
        self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
547 548
        self.assertEqual(all[flipped], (sub11_path, [], []))
        self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
549 550 551 552 553 554 555 556 557 558 559
        self.assertEqual(all[2 - 2 * flipped], sub2_tree)

        if hasattr(os, "symlink"):
            # Walk, following symlinks.
            for root, dirs, files in os.walk(walk_path, followlinks=True):
                if root == link_path:
                    self.assertEqual(dirs, [])
                    self.assertEqual(files, ["tmp4"])
                    break
            else:
                self.fail("Didn't follow symlink with followlinks=True")
560

561
    def tearDown(self):
562 563 564 565
        # Tear everything down.  This is a decent use for bottom-up on
        # Windows, which doesn't have a recursive delete command.  The
        # (not so) subtlety is that rmdir will fail unless the dir's
        # kids are removed first, so bottom up is essential.
566
        for root, dirs, files in os.walk(support.TESTFN, topdown=False):
567
            for name in files:
568
                os.remove(os.path.join(root, name))
569
            for name in dirs:
570 571 572 573 574
                dirname = os.path.join(root, name)
                if not os.path.islink(dirname):
                    os.rmdir(dirname)
                else:
                    os.remove(dirname)
575
        os.rmdir(support.TESTFN)
576

577
class MakedirTests(unittest.TestCase):
578
    def setUp(self):
579
        os.mkdir(support.TESTFN)
580 581

    def test_makedir(self):
582
        base = support.TESTFN
583 584 585 586 587 588
        path = os.path.join(base, 'dir1', 'dir2', 'dir3')
        os.makedirs(path)             # Should work
        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
        os.makedirs(path)

        # Try paths with a '.' in them
589
        self.assertRaises(OSError, os.makedirs, os.curdir)
590 591 592 593 594 595 596
        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
        os.makedirs(path)
        path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
                            'dir5', 'dir6')
        os.makedirs(path)

    def tearDown(self):
597
        path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
598 599 600 601
                            'dir4', 'dir5', 'dir6')
        # If the tests failed, the bottom-most directory ('../dir6')
        # may not have been created, so we look for the outermost directory
        # that exists.
602
        while not os.path.exists(path) and path != support.TESTFN:
603 604 605 606
            path = os.path.dirname(path)

        os.removedirs(path)

607
class DevNullTests(unittest.TestCase):
608
    def test_devnull(self):
609
        f = open(os.devnull, 'w')
610 611
        f.write('hello')
        f.close()
612
        f = open(os.devnull, 'r')
613
        self.assertEqual(f.read(), '')
614
        f.close()
615

616
class URandomTests(unittest.TestCase):
617 618 619 620 621 622 623 624 625
    def test_urandom(self):
        try:
            self.assertEqual(len(os.urandom(1)), 1)
            self.assertEqual(len(os.urandom(10)), 10)
            self.assertEqual(len(os.urandom(100)), 100)
            self.assertEqual(len(os.urandom(1000)), 1000)
        except NotImplementedError:
            pass

626
class ExecTests(unittest.TestCase):
627 628
    @unittest.skipIf(USING_LINUXTHREADS,
                     "avoid triggering a linuxthreads bug: see issue #4970")
629
    def test_execvpe_with_bad_program(self):
630 631
        self.assertRaises(OSError, os.execvpe, 'no such app-',
                          ['no such app-'], None)
632

633 634 635
    def test_execvpe_with_bad_arglist(self):
        self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)

636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
    class _stub_out_for_execvpe_test(object):
        """
        Stubs out execv, execve and get_exec_path functions when
        used as context manager.  Records exec calls.  The mock execv
        and execve functions always raise an exception as they would
        normally never return.
        """
        def __init__(self):
            # A list of tuples containing (function name, first arg, args)
            # of calls to execv or execve that have been made.
            self.calls = []
        def _mock_execv(self, name, *args):
            self.calls.append(('execv', name, args))
            raise RuntimeError("execv called")

        def _mock_execve(self, name, *args):
            self.calls.append(('execve', name, args))
            raise OSError(errno.ENOTDIR, "execve called")

        def _mock_get_exec_path(self, env=None):
656
            return [os.sep+'p', os.sep+'pp']
657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675

        def __enter__(self):
            self.orig_execv = os.execv
            self.orig_execve = os.execve
            self.orig_get_exec_path = os.get_exec_path
            os.execv = self._mock_execv
            os.execve = self._mock_execve
            os.get_exec_path = self._mock_get_exec_path

        def __exit__(self, type, value, tb):
            os.execv = self.orig_execv
            os.execve = self.orig_execve
            os.get_exec_path = self.orig_get_exec_path

    @unittest.skipUnless(hasattr(os, '_execvpe'),
                         "No internal os._execvpe function to test.")
    def test_internal_execvpe(self):
        exec_stubbed = self._stub_out_for_execvpe_test()
        with exec_stubbed:
676 677 678
            self.assertRaises(RuntimeError, os._execvpe, os.sep+'f', ['-a'])
            self.assertEqual([('execv', os.sep+'f', (['-a'],))],
                             exec_stubbed.calls)
679 680 681
            exec_stubbed.calls = []
            self.assertRaises(OSError, os._execvpe, 'f', ['-a'],
                              env={'spam': 'beans'})
682 683 684 685
            self.assertEqual([('execve', os.sep+'p'+os.sep+'f',
                               (['-a'], {'spam': 'beans'})),
                              ('execve', os.sep+'pp'+os.sep+'f',
                               (['-a'], {'spam': 'beans'}))],
686 687
                             exec_stubbed.calls)

688 689
class Win32ErrorTests(unittest.TestCase):
    def test_rename(self):
690
        self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
691 692

    def test_remove(self):
693
        self.assertRaises(WindowsError, os.remove, support.TESTFN)
694 695

    def test_chdir(self):
696
        self.assertRaises(WindowsError, os.chdir, support.TESTFN)
697 698

    def test_mkdir(self):
699
        f = open(support.TESTFN, "w")
Benjamin Peterson's avatar
Benjamin Peterson committed
700 701 702 703
        try:
            self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
        finally:
            f.close()
704
            os.unlink(support.TESTFN)
705 706

    def test_utime(self):
707
        self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
708 709

    def test_chmod(self):
Benjamin Peterson's avatar
Benjamin Peterson committed
710
        self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
711

712
class TestInvalidFD(unittest.TestCase):
713
    singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
714 715 716 717 718
               "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
    #singles.append("close")
    #We omit close because it doesn'r raise an exception on some platforms
    def get_single(f):
        def helper(self):
719 720
            if  hasattr(os, f):
                self.check(getattr(os, f))
721 722 723 724
        return helper
    for f in singles:
        locals()["test_"+f] = get_single(f)

725
    def check(self, f, *args):
Benjamin Peterson's avatar
Benjamin Peterson committed
726 727 728 729 730 731 732
        try:
            f(support.make_bad_fd(), *args)
        except OSError as e:
            self.assertEqual(e.errno, errno.EBADF)
        else:
            self.fail("%r didn't raise a OSError with a bad file descriptor"
                      % f)
733

734 735
    def test_isatty(self):
        if hasattr(os, "isatty"):
736
            self.assertEqual(os.isatty(support.make_bad_fd()), False)
737 738 739

    def test_closerange(self):
        if hasattr(os, "closerange"):
740
            fd = support.make_bad_fd()
741 742 743 744 745 746 747 748 749 750 751 752
            # Make sure none of the descriptors we are about to close are
            # currently valid (issue 6542).
            for i in range(10):
                try: os.fstat(fd+i)
                except OSError:
                    pass
                else:
                    break
            if i < 2:
                raise unittest.SkipTest(
                    "Unable to acquire a range of invalid file descriptors")
            self.assertEqual(os.closerange(fd, fd + i-1), None)
753 754 755

    def test_dup2(self):
        if hasattr(os, "dup2"):
756
            self.check(os.dup2, 20)
757 758 759

    def test_fchmod(self):
        if hasattr(os, "fchmod"):
760
            self.check(os.fchmod, 0)
761 762 763

    def test_fchown(self):
        if hasattr(os, "fchown"):
764
            self.check(os.fchown, -1, -1)
765 766 767

    def test_fpathconf(self):
        if hasattr(os, "fpathconf"):
768
            self.check(os.fpathconf, "PC_NAME_MAX")
769 770 771

    def test_ftruncate(self):
        if hasattr(os, "ftruncate"):
772
            self.check(os.ftruncate, 0)
773 774 775

    def test_lseek(self):
        if hasattr(os, "lseek"):
776
            self.check(os.lseek, 0, 0)
777 778 779

    def test_read(self):
        if hasattr(os, "read"):
780
            self.check(os.read, 1)
781 782 783

    def test_tcsetpgrpt(self):
        if hasattr(os, "tcsetpgrp"):
784
            self.check(os.tcsetpgrp, 0)
785 786 787

    def test_write(self):
        if hasattr(os, "write"):
788
            self.check(os.write, b" ")
789

790 791 792 793
if sys.platform != 'win32':
    class Win32ErrorTests(unittest.TestCase):
        pass

Benjamin Peterson's avatar
Benjamin Peterson committed
794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824
    class PosixUidGidTests(unittest.TestCase):
        if hasattr(os, 'setuid'):
            def test_setuid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setuid, 0)
                self.assertRaises(OverflowError, os.setuid, 1<<32)

        if hasattr(os, 'setgid'):
            def test_setgid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setgid, 0)
                self.assertRaises(OverflowError, os.setgid, 1<<32)

        if hasattr(os, 'seteuid'):
            def test_seteuid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.seteuid, 0)
                self.assertRaises(OverflowError, os.seteuid, 1<<32)

        if hasattr(os, 'setegid'):
            def test_setegid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setegid, 0)
                self.assertRaises(OverflowError, os.setegid, 1<<32)

        if hasattr(os, 'setreuid'):
            def test_setreuid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setreuid, 0, 0)
                self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
                self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
825 826 827 828 829 830 831

            def test_setreuid_neg1(self):
                # Needs to accept -1.  We run this in a subprocess to avoid
                # altering the test runner's process state (issue8045).
                subprocess.check_call([
                        sys.executable, '-c',
                        'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Peterson's avatar
Benjamin Peterson committed
832 833 834 835 836 837 838

        if hasattr(os, 'setregid'):
            def test_setregid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setregid, 0, 0)
                self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
                self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
839 840 841 842 843 844 845

            def test_setregid_neg1(self):
                # Needs to accept -1.  We run this in a subprocess to avoid
                # altering the test runner's process state (issue8045).
                subprocess.check_call([
                        sys.executable, '-c',
                        'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
846

847
    @unittest.skipIf(sys.platform == 'darwin', "tests don't apply to OS X")
848 849 850 851 852 853 854
    class Pep383Tests(unittest.TestCase):
        filenames = [b'foo\xf6bar', 'foo\xf6bar'.encode("utf-8")]

        def setUp(self):
            self.fsencoding = sys.getfilesystemencoding()
            sys.setfilesystemencoding("utf-8")
            self.dir = support.TESTFN
855
            self.bdir = self.dir.encode("utf-8", "surrogateescape")
856 857 858 859 860
            os.mkdir(self.dir)
            self.unicodefn = []
            for fn in self.filenames:
                f = open(os.path.join(self.bdir, fn), "w")
                f.close()
861
                self.unicodefn.append(fn.decode("utf-8", "surrogateescape"))
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879

        def tearDown(self):
            shutil.rmtree(self.dir)
            sys.setfilesystemencoding(self.fsencoding)

        def test_listdir(self):
            expected = set(self.unicodefn)
            found = set(os.listdir(support.TESTFN))
            self.assertEquals(found, expected)

        def test_open(self):
            for fn in self.unicodefn:
                f = open(os.path.join(self.dir, fn))
                f.close()

        def test_stat(self):
            for fn in self.unicodefn:
                os.stat(os.path.join(self.dir, fn))
Benjamin Peterson's avatar
Benjamin Peterson committed
880 881 882
else:
    class PosixUidGidTests(unittest.TestCase):
        pass
883 884
    class Pep383Tests(unittest.TestCase):
        pass
Benjamin Peterson's avatar
Benjamin Peterson committed
885

886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32KillTests(unittest.TestCase):
    def _kill(self, sig, *args):
        # Send a subprocess a signal (or in some cases, just an int to be
        # the return value)
        proc = subprocess.Popen(*args)
        os.kill(proc.pid, sig)
        self.assertEqual(proc.wait(), sig)

    def test_kill_sigterm(self):
        # SIGTERM doesn't mean anything special, but make sure it works
        self._kill(signal.SIGTERM, [sys.executable])

    def test_kill_int(self):
        # os.kill on Windows can take an int which gets set as the exit code
        self._kill(100, [sys.executable])

    def _kill_with_event(self, event, name):
        # Run a script which has console control handling enabled.
        proc = subprocess.Popen([sys.executable,
                   os.path.join(os.path.dirname(__file__),
                                "win_console_handler.py")],
                   creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
        # Let the interpreter startup before we send signals. See #3137.
        time.sleep(0.5)
        os.kill(proc.pid, event)
        # proc.send_signal(event) could also be done here.
        # Allow time for the signal to be passed and the process to exit.
        time.sleep(0.5)
        if not proc.poll():
            # Forcefully kill the process if we weren't able to signal it.
            os.kill(proc.pid, signal.SIGINT)
            self.fail("subprocess did not stop on {}".format(name))

    @unittest.skip("subprocesses aren't inheriting CTRL+C property")
    def test_CTRL_C_EVENT(self):
        from ctypes import wintypes
        import ctypes

        # Make a NULL value by creating a pointer with no argument.
        NULL = ctypes.POINTER(ctypes.c_int)()
        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
                                          wintypes.BOOL)
        SetConsoleCtrlHandler.restype = wintypes.BOOL

        # Calling this with NULL and FALSE causes the calling process to
        # handle CTRL+C, rather than ignore it. This property is inherited
        # by subprocesses.
        SetConsoleCtrlHandler(NULL, 0)

        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")

    def test_CTRL_BREAK_EVENT(self):
        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")


943
class MiscTests(unittest.TestCase):
944 945

    @unittest.skipIf(os.name == "nt", "POSIX specific test")
946 947 948 949 950
    def test_fsencode(self):
        self.assertEquals(os.fsencode(b'ab\xff'), b'ab\xff')
        self.assertEquals(os.fsencode('ab\uDCFF'), b'ab\xff')


951
def test_main():
952
    support.run_unittest(
953
        FileTests,
954 955
        StatAttributeTests,
        EnvironTests,
956 957
        WalkTests,
        MakedirTests,
958
        DevNullTests,
959
        URandomTests,
960
        ExecTests,
961
        Win32ErrorTests,
Benjamin Peterson's avatar
Benjamin Peterson committed
962
        TestInvalidFD,
963
        PosixUidGidTests,
964
        Pep383Tests,
965 966
        Win32KillTests,
        MiscTests,
967
    )
968 969 970

if __name__ == "__main__":
    test_main()