You need to sign in or sign up before continuing.
test_os.py 45.8 KB
Newer Older
1 2
# As a test suite for the os module, this is woefully inadequate, but this
# does add tests for a few functions which have been determined to be more
3
# portable than they had been thought to be.
4 5

import os
Benjamin Peterson's avatar
Benjamin Peterson committed
6
import errno
7
import unittest
8
import warnings
9
import sys
10 11 12
import signal
import subprocess
import time
13
import shutil
14
from test import support
15
import contextlib
16 17
import mmap
import uuid
18

19 20 21 22
# Detect whether we're on a Linux system that uses the (now outdated
# and unmaintained) linuxthreads threading library.  There's an issue
# when combining linuxthreads with a failed execv call: see
# http://bugs.python.org/issue4970.
23 24
if (hasattr(os, "confstr_names") and
    "CS_GNU_LIBPTHREAD_VERSION" in os.confstr_names):
25 26 27 28
    libpthread = os.confstr("CS_GNU_LIBPTHREAD_VERSION")
    USING_LINUXTHREADS= libpthread.startswith("linuxthreads")
else:
    USING_LINUXTHREADS= False
29

30 31 32
# Tests creating TESTFN
class FileTests(unittest.TestCase):
    def setUp(self):
33 34
        if os.path.exists(support.TESTFN):
            os.unlink(support.TESTFN)
35 36 37
    tearDown = setUp

    def test_access(self):
38
        f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
39
        os.close(f)
40
        self.assertTrue(os.access(support.TESTFN, os.W_OK))
41

42
    def test_closerange(self):
43 44 45 46 47 48 49 50 51 52 53 54
        first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
        # We must allocate two consecutive file descriptors, otherwise
        # it will mess up other file descriptors (perhaps even the three
        # standard ones).
        second = os.dup(first)
        try:
            retries = 0
            while second != first + 1:
                os.close(first)
                retries += 1
                if retries > 10:
                    # XXX test skipped
Benjamin Peterson's avatar
Benjamin Peterson committed
55
                    self.skipTest("couldn't allocate two consecutive fds")
56 57 58
                first, second = second, os.dup(second)
        finally:
            os.close(second)
59
        # close a fd that is open, and one that isn't
60
        os.closerange(first, first + 2)
61
        self.assertRaises(OSError, os.write, first, b"a")
62

63
    @support.cpython_only
64 65 66 67 68 69 70
    def test_rename(self):
        path = support.TESTFN
        old = sys.getrefcount(path)
        self.assertRaises(TypeError, os.rename, path, 0)
        new = sys.getrefcount(path)
        self.assertEqual(old, new)

71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
    def test_read(self):
        with open(support.TESTFN, "w+b") as fobj:
            fobj.write(b"spam")
            fobj.flush()
            fd = fobj.fileno()
            os.lseek(fd, 0, 0)
            s = os.read(fd, 4)
            self.assertEqual(type(s), bytes)
            self.assertEqual(s, b"spam")

    def test_write(self):
        # os.write() accepts bytes- and buffer-like objects but not strings
        fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
        self.assertRaises(TypeError, os.write, fd, "beans")
        os.write(fd, b"bacon\n")
        os.write(fd, bytearray(b"eggs\n"))
        os.write(fd, memoryview(b"spam\n"))
        os.close(fd)
        with open(support.TESTFN, "rb") as fobj:
90 91
            self.assertEqual(fobj.read().splitlines(),
                [b"bacon", b"eggs", b"spam"])
92 93


Christian Heimes's avatar
Christian Heimes committed
94 95 96
class TemporaryFileTests(unittest.TestCase):
    def setUp(self):
        self.files = []
97
        os.mkdir(support.TESTFN)
Christian Heimes's avatar
Christian Heimes committed
98 99 100 101

    def tearDown(self):
        for name in self.files:
            os.unlink(name)
102
        os.rmdir(support.TESTFN)
Christian Heimes's avatar
Christian Heimes committed
103 104 105

    def check_tempfile(self, name):
        # make sure it doesn't already exist:
106
        self.assertFalse(os.path.exists(name),
Christian Heimes's avatar
Christian Heimes committed
107 108 109 110 111 112 113 114 115 116 117 118
                    "file already exists for temporary file")
        # make sure we can create the file
        open(name, "w")
        self.files.append(name)

    def test_tempnam(self):
        if not hasattr(os, "tempnam"):
            return
        warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
                                r"test_os$")
        self.check_tempfile(os.tempnam())

119
        name = os.tempnam(support.TESTFN)
Christian Heimes's avatar
Christian Heimes committed
120 121
        self.check_tempfile(name)

122
        name = os.tempnam(support.TESTFN, "pfx")
123
        self.assertTrue(os.path.basename(name)[:3] == "pfx")
Christian Heimes's avatar
Christian Heimes committed
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
        self.check_tempfile(name)

    def test_tmpfile(self):
        if not hasattr(os, "tmpfile"):
            return
        # As with test_tmpnam() below, the Windows implementation of tmpfile()
        # attempts to create a file in the root directory of the current drive.
        # On Vista and Server 2008, this test will always fail for normal users
        # as writing to the root directory requires elevated privileges.  With
        # XP and below, the semantics of tmpfile() are the same, but the user
        # running the test is more likely to have administrative privileges on
        # their account already.  If that's the case, then os.tmpfile() should
        # work.  In order to make this test as useful as possible, rather than
        # trying to detect Windows versions or whether or not the user has the
        # right permissions, just try and create a file in the root directory
        # and see if it raises a 'Permission denied' OSError.  If it does, then
        # test that a subsequent call to os.tmpfile() raises the same error. If
        # it doesn't, assume we're on XP or below and the user running the test
        # has administrative privileges, and proceed with the test as normal.
        if sys.platform == 'win32':
            name = '\\python_test_os_test_tmpfile.txt'
            if os.path.exists(name):
                os.remove(name)
            try:
                fp = open(name, 'w')
            except IOError as first:
                # open() failed, assert tmpfile() fails in the same way.
                # Although open() raises an IOError and os.tmpfile() raises an
                # OSError(), 'args' will be (13, 'Permission denied') in both
                # cases.
                try:
                    fp = os.tmpfile()
                except OSError as second:
                    self.assertEqual(first.args, second.args)
                else:
                    self.fail("expected os.tmpfile() to raise OSError")
                return
            else:
                # open() worked, therefore, tmpfile() should work.  Close our
                # dummy file and proceed with the test as normal.
                fp.close()
                os.remove(name)

        fp = os.tmpfile()
        fp.write("foobar")
        fp.seek(0,0)
        s = fp.read()
        fp.close()
172
        self.assertTrue(s == "foobar")
Christian Heimes's avatar
Christian Heimes committed
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195

    def test_tmpnam(self):
        if not hasattr(os, "tmpnam"):
            return
        warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
                                r"test_os$")
        name = os.tmpnam()
        if sys.platform in ("win32",):
            # The Windows tmpnam() seems useless.  From the MS docs:
            #
            #     The character string that tmpnam creates consists of
            #     the path prefix, defined by the entry P_tmpdir in the
            #     file STDIO.H, followed by a sequence consisting of the
            #     digit characters '0' through '9'; the numerical value
            #     of this string is in the range 1 - 65,535.  Changing the
            #     definitions of L_tmpnam or P_tmpdir in STDIO.H does not
            #     change the operation of tmpnam.
            #
            # The really bizarre part is that, at least under MSVC6,
            # P_tmpdir is "\\".  That is, the path returned refers to
            # the root of the current drive.  That's a terrible place to
            # put temp files, and, depending on privileges, the user
            # may not even be able to open a file in the root directory.
196
            self.assertFalse(os.path.exists(name),
Christian Heimes's avatar
Christian Heimes committed
197 198 199 200
                        "file already exists for temporary file")
        else:
            self.check_tempfile(name)

201 202 203 204 205 206 207 208 209 210
    def fdopen_helper(self, *args):
        fd = os.open(support.TESTFN, os.O_RDONLY)
        fp2 = os.fdopen(fd, *args)
        fp2.close()

    def test_fdopen(self):
        self.fdopen_helper()
        self.fdopen_helper('r')
        self.fdopen_helper('r', 100)

211 212 213
# Test attributes on return values from os.*stat* family.
class StatAttributeTests(unittest.TestCase):
    def setUp(self):
214 215
        os.mkdir(support.TESTFN)
        self.fname = os.path.join(support.TESTFN, "f1")
216
        f = open(self.fname, 'wb')
Guido van Rossum's avatar
Guido van Rossum committed
217
        f.write(b"ABC")
218
        f.close()
Tim Peters's avatar
Tim Peters committed
219

220 221
    def tearDown(self):
        os.unlink(self.fname)
222
        os.rmdir(support.TESTFN)
223

224
    def check_stat_attributes(self, fname):
225 226 227 228
        if not hasattr(os, "stat"):
            return

        import stat
229
        result = os.stat(fname)
230 231 232 233 234 235 236 237 238 239

        # Make sure direct access works
        self.assertEquals(result[stat.ST_SIZE], 3)
        self.assertEquals(result.st_size, 3)

        # Make sure all the attributes are there
        members = dir(result)
        for name in dir(stat):
            if name[:3] == 'ST_':
                attr = name.lower()
240 241 242 243 244
                if name.endswith("TIME"):
                    def trunc(x): return int(x)
                else:
                    def trunc(x): return x
                self.assertEquals(trunc(getattr(result, attr)),
245
                                  result[getattr(stat, name)])
246
                self.assertIn(attr, members)
247 248 249 250 251 252 253 254 255 256 257

        try:
            result[200]
            self.fail("No exception thrown")
        except IndexError:
            pass

        # Make sure that assignment fails
        try:
            result.st_mode = 1
            self.fail("No exception thrown")
258
        except AttributeError:
259 260 261 262 263
            pass

        try:
            result.st_rdev = 1
            self.fail("No exception thrown")
264
        except (AttributeError, TypeError):
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
            pass

        try:
            result.parrot = 1
            self.fail("No exception thrown")
        except AttributeError:
            pass

        # Use the stat_result constructor with a too-short tuple.
        try:
            result2 = os.stat_result((10,))
            self.fail("No exception thrown")
        except TypeError:
            pass

        # Use the constructr with a too-long tuple.
        try:
            result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
        except TypeError:
            pass

286 287 288 289 290 291 292 293 294
    def test_stat_attributes(self):
        self.check_stat_attributes(self.fname)

    def test_stat_attributes_bytes(self):
        try:
            fname = self.fname.encode(sys.getfilesystemencoding())
        except UnicodeEncodeError:
            self.skipTest("cannot encode %a for the filesystem" % self.fname)
        self.check_stat_attributes(fname)
Tim Peters's avatar
Tim Peters committed
295

296 297 298 299
    def test_statvfs_attributes(self):
        if not hasattr(os, "statvfs"):
            return

Martin v. Löwis's avatar
Martin v. Löwis committed
300 301
        try:
            result = os.statvfs(self.fname)
302
        except OSError as e:
Martin v. Löwis's avatar
Martin v. Löwis committed
303 304 305
            # On AtheOS, glibc always returns ENOSYS
            if e.errno == errno.ENOSYS:
                return
306 307

        # Make sure direct access works
308
        self.assertEquals(result.f_bfree, result[3])
309

310 311 312 313 314
        # Make sure all the attributes are there.
        members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
                    'ffree', 'favail', 'flag', 'namemax')
        for value, member in enumerate(members):
            self.assertEquals(getattr(result, 'f_' + member), result[value])
315 316 317 318 319

        # Make sure that assignment really fails
        try:
            result.f_bfree = 1
            self.fail("No exception thrown")
320
        except AttributeError:
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
            pass

        try:
            result.parrot = 1
            self.fail("No exception thrown")
        except AttributeError:
            pass

        # Use the constructor with a too-short tuple.
        try:
            result2 = os.statvfs_result((10,))
            self.fail("No exception thrown")
        except TypeError:
            pass

        # Use the constructr with a too-long tuple.
        try:
            result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
        except TypeError:
            pass
341

342 343
    def test_utime_dir(self):
        delta = 1000000
344
        st = os.stat(support.TESTFN)
345 346
        # round to int, because some systems may support sub-second
        # time stamps in stat, but not in utime.
347 348
        os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
        st2 = os.stat(support.TESTFN)
349 350 351 352 353
        self.assertEquals(st2.st_mtime, int(st.st_mtime-delta))

    # Restrict test to Win32, since there is no guarantee other
    # systems support centiseconds
    if sys.platform == 'win32':
354
        def get_file_system(path):
355
            root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
356
            import ctypes
357
            kernel32 = ctypes.windll.kernel32
358
            buf = ctypes.create_unicode_buffer("", 100)
359
            if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
360 361
                return buf.value

362
        if get_file_system(support.TESTFN) == "NTFS":
363 364 365 366
            def test_1565150(self):
                t1 = 1159195039.25
                os.utime(self.fname, (t1, t1))
                self.assertEquals(os.stat(self.fname).st_mtime, t1)
367

368 369 370 371 372
        def test_1686475(self):
            # Verify that an open file can be stat'ed
            try:
                os.stat(r"c:\pagefile.sys")
            except WindowsError as e:
373
                if e.errno == 2: # file does not exist; cannot run test
374 375 376
                    return
                self.fail("Could not stat pagefile.sys")

377
from test import mapping_tests
378

379
class EnvironTests(mapping_tests.BasicTestMappingProtocol):
380
    """check that os.environ object conform to mapping protocol"""
381
    type2test = None
382

383 384
    def setUp(self):
        self.__save = dict(os.environ)
385
        if os.supports_bytes_environ:
386
            self.__saveb = dict(os.environb)
387 388 389
        for key, value in self._reference().items():
            os.environ[key] = value

390 391 392
    def tearDown(self):
        os.environ.clear()
        os.environ.update(self.__save)
393
        if os.supports_bytes_environ:
394 395
            os.environb.clear()
            os.environb.update(self.__saveb)
396

397 398 399 400 401 402 403
    def _reference(self):
        return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}

    def _empty_mapping(self):
        os.environ.clear()
        return os.environ

404
    # Bug 1110478
405
    def test_update2(self):
406
        os.environ.clear()
407 408 409 410 411
        if os.path.exists("/bin/sh"):
            os.environ.update(HELLO="World")
            value = os.popen("/bin/sh -c 'echo $HELLO'").read().strip()
            self.assertEquals(value, "World")

412 413 414 415 416 417 418 419 420
    def test_os_popen_iter(self):
        if os.path.exists("/bin/sh"):
            popen = os.popen("/bin/sh -c 'echo \"line1\nline2\nline3\"'")
            it = iter(popen)
            self.assertEquals(next(it), "line1\n")
            self.assertEquals(next(it), "line2\n")
            self.assertEquals(next(it), "line3\n")
            self.assertRaises(StopIteration, next, it)

421 422 423 424 425 426 427
    # Verify environ keys and values from the OS are of the
    # correct str type.
    def test_keyvalue_types(self):
        for key, val in os.environ.items():
            self.assertEquals(type(key), str)
            self.assertEquals(type(val), str)

428 429 430 431
    def test_items(self):
        for key, value in self._reference().items():
            self.assertEqual(os.environ.get(key), value)

432 433 434 435
    # Issue 7310
    def test___repr__(self):
        """Check that the repr() of os.environ looks like environ({...})."""
        env = os.environ
436 437 438
        self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
            '{!r}: {!r}'.format(key, value)
            for key, value in env.items())))
439

440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
    def test_get_exec_path(self):
        defpath_list = os.defpath.split(os.pathsep)
        test_path = ['/monty', '/python', '', '/flying/circus']
        test_env = {'PATH': os.pathsep.join(test_path)}

        saved_environ = os.environ
        try:
            os.environ = dict(test_env)
            # Test that defaulting to os.environ works.
            self.assertSequenceEqual(test_path, os.get_exec_path())
            self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
        finally:
            os.environ = saved_environ

        # No PATH environment variable
        self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
        # Empty PATH environment variable
        self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
        # Supplied PATH environment variable
        self.assertSequenceEqual(test_path, os.get_exec_path(test_env))

461 462
        if os.supports_bytes_environ:
            # env cannot contain 'PATH' and b'PATH' keys
463 464 465 466 467 468
            try:
                mixed_env = {'PATH': '1', b'PATH': b'2'}
            except BytesWarning:
                pass
            else:
                self.assertRaises(ValueError, os.get_exec_path, mixed_env)
469 470 471 472 473 474 475 476 477 478 479

            # bytes key and/or value
            self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
                ['abc'])
            self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
                ['abc'])
            self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
                ['abc'])

    @unittest.skipUnless(os.supports_bytes_environ,
                         "os.environb required for this test.")
480 481 482 483
    def test_environb(self):
        # os.environ -> os.environb
        value = 'euro\u20ac'
        try:
Benjamin Peterson's avatar
Benjamin Peterson committed
484 485
            value_bytes = value.encode(sys.getfilesystemencoding(),
                                       'surrogateescape')
486
        except UnicodeEncodeError:
Benjamin Peterson's avatar
Benjamin Peterson committed
487 488
            msg = "U+20AC character is not encodable to %s" % (
                sys.getfilesystemencoding(),)
Benjamin Peterson's avatar
Benjamin Peterson committed
489
            self.skipTest(msg)
490 491 492 493 494 495 496 497 498 499
        os.environ['unicode'] = value
        self.assertEquals(os.environ['unicode'], value)
        self.assertEquals(os.environb[b'unicode'], value_bytes)

        # os.environb -> os.environ
        value = b'\xff'
        os.environb[b'bytes'] = value
        self.assertEquals(os.environb[b'bytes'], value)
        value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
        self.assertEquals(os.environ['bytes'], value_str)
500

501 502 503 504 505 506 507 508
class WalkTests(unittest.TestCase):
    """Tests for os.walk()."""

    def test_traversal(self):
        import os
        from os.path import join

        # Build:
509 510
        #     TESTFN/
        #       TEST1/              a file kid and two directory kids
511 512
        #         tmp1
        #         SUB1/             a file kid and a directory kid
513 514 515 516 517 518 519
        #           tmp2
        #           SUB11/          no kids
        #         SUB2/             a file kid and a dirsymlink kid
        #           tmp3
        #           link/           a symlink to TESTFN.2
        #       TEST2/
        #         tmp4              a lone file
520
        walk_path = join(support.TESTFN, "TEST1")
521
        sub1_path = join(walk_path, "SUB1")
522
        sub11_path = join(sub1_path, "SUB11")
523 524
        sub2_path = join(walk_path, "SUB2")
        tmp1_path = join(walk_path, "tmp1")
525 526
        tmp2_path = join(sub1_path, "tmp2")
        tmp3_path = join(sub2_path, "tmp3")
527
        link_path = join(sub2_path, "link")
528 529
        t2_path = join(support.TESTFN, "TEST2")
        tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
530 531 532 533

        # Create stuff.
        os.makedirs(sub11_path)
        os.makedirs(sub2_path)
534 535
        os.makedirs(t2_path)
        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
536
            f = open(path, "w")
537 538
            f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
            f.close()
539
        if support.can_symlink():
540 541 542 543
            os.symlink(os.path.abspath(t2_path), link_path)
            sub2_tree = (sub2_path, ["link"], ["tmp3"])
        else:
            sub2_tree = (sub2_path, [], ["tmp3"])
544 545

        # Walk top-down.
546
        all = list(os.walk(walk_path))
547 548 549 550 551 552
        self.assertEqual(len(all), 4)
        # We can't know which order SUB1 and SUB2 will appear in.
        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
        #     flipped:  TESTFN, SUB2, SUB1, SUB11
        flipped = all[0][1][0] != "SUB1"
        all[0][1].sort()
553
        self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
554 555
        self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
        self.assertEqual(all[2 + flipped], (sub11_path, [], []))
556
        self.assertEqual(all[3 - 2 * flipped], sub2_tree)
557 558 559

        # Prune the search.
        all = []
560
        for root, dirs, files in os.walk(walk_path):
561 562 563 564 565 566
            all.append((root, dirs, files))
            # Don't descend into SUB1.
            if 'SUB1' in dirs:
                # Note that this also mutates the dirs we appended to all!
                dirs.remove('SUB1')
        self.assertEqual(len(all), 2)
567 568
        self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
        self.assertEqual(all[1], sub2_tree)
569 570

        # Walk bottom-up.
571
        all = list(os.walk(walk_path, topdown=False))
572 573 574 575 576 577
        self.assertEqual(len(all), 4)
        # We can't know which order SUB1 and SUB2 will appear in.
        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
        #     flipped:  SUB2, SUB11, SUB1, TESTFN
        flipped = all[3][1][0] != "SUB1"
        all[3][1].sort()
578
        self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
579 580
        self.assertEqual(all[flipped], (sub11_path, [], []))
        self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
581 582
        self.assertEqual(all[2 - 2 * flipped], sub2_tree)

583
        if support.can_symlink():
584 585 586 587 588 589 590 591
            # Walk, following symlinks.
            for root, dirs, files in os.walk(walk_path, followlinks=True):
                if root == link_path:
                    self.assertEqual(dirs, [])
                    self.assertEqual(files, ["tmp4"])
                    break
            else:
                self.fail("Didn't follow symlink with followlinks=True")
592

593
    def tearDown(self):
594 595 596 597
        # Tear everything down.  This is a decent use for bottom-up on
        # Windows, which doesn't have a recursive delete command.  The
        # (not so) subtlety is that rmdir will fail unless the dir's
        # kids are removed first, so bottom up is essential.
598
        for root, dirs, files in os.walk(support.TESTFN, topdown=False):
599
            for name in files:
600
                os.remove(os.path.join(root, name))
601
            for name in dirs:
602 603 604 605 606
                dirname = os.path.join(root, name)
                if not os.path.islink(dirname):
                    os.rmdir(dirname)
                else:
                    os.remove(dirname)
607
        os.rmdir(support.TESTFN)
608

609
class MakedirTests(unittest.TestCase):
610
    def setUp(self):
611
        os.mkdir(support.TESTFN)
612 613

    def test_makedir(self):
614
        base = support.TESTFN
615 616 617 618 619 620
        path = os.path.join(base, 'dir1', 'dir2', 'dir3')
        os.makedirs(path)             # Should work
        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
        os.makedirs(path)

        # Try paths with a '.' in them
621
        self.assertRaises(OSError, os.makedirs, os.curdir)
622 623 624 625 626 627 628
        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
        os.makedirs(path)
        path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
                            'dir5', 'dir6')
        os.makedirs(path)

    def tearDown(self):
629
        path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
630 631 632 633
                            'dir4', 'dir5', 'dir6')
        # If the tests failed, the bottom-most directory ('../dir6')
        # may not have been created, so we look for the outermost directory
        # that exists.
634
        while not os.path.exists(path) and path != support.TESTFN:
635 636 637 638
            path = os.path.dirname(path)

        os.removedirs(path)

639
class DevNullTests(unittest.TestCase):
640
    def test_devnull(self):
641
        f = open(os.devnull, 'w')
642 643
        f.write('hello')
        f.close()
644
        f = open(os.devnull, 'r')
645
        self.assertEqual(f.read(), '')
646
        f.close()
647

648
class URandomTests(unittest.TestCase):
649 650 651 652 653 654 655 656 657
    def test_urandom(self):
        try:
            self.assertEqual(len(os.urandom(1)), 1)
            self.assertEqual(len(os.urandom(10)), 10)
            self.assertEqual(len(os.urandom(100)), 100)
            self.assertEqual(len(os.urandom(1000)), 1000)
        except NotImplementedError:
            pass

658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
@contextlib.contextmanager
def _execvpe_mockup(defpath=None):
    """
    Stubs out execv and execve functions when used as context manager.
    Records exec calls. The mock execv and execve functions always raise an
    exception as they would normally never return.
    """
    # A list of tuples containing (function name, first arg, args)
    # of calls to execv or execve that have been made.
    calls = []

    def mock_execv(name, *args):
        calls.append(('execv', name, args))
        raise RuntimeError("execv called")

    def mock_execve(name, *args):
        calls.append(('execve', name, args))
        raise OSError(errno.ENOTDIR, "execve called")

    try:
        orig_execv = os.execv
        orig_execve = os.execve
        orig_defpath = os.defpath
        os.execv = mock_execv
        os.execve = mock_execve
        if defpath is not None:
            os.defpath = defpath
        yield calls
    finally:
        os.execv = orig_execv
        os.execve = orig_execve
        os.defpath = orig_defpath

691
class ExecTests(unittest.TestCase):
692 693
    @unittest.skipIf(USING_LINUXTHREADS,
                     "avoid triggering a linuxthreads bug: see issue #4970")
694
    def test_execvpe_with_bad_program(self):
695 696
        self.assertRaises(OSError, os.execvpe, 'no such app-',
                          ['no such app-'], None)
697

698 699 700
    def test_execvpe_with_bad_arglist(self):
        self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)

701 702
    @unittest.skipUnless(hasattr(os, '_execvpe'),
                         "No internal os._execvpe function to test.")
703 704 705 706 707 708 709 710 711 712 713 714 715 716 717
    def _test_internal_execvpe(self, test_type):
        program_path = os.sep + 'absolutepath'
        if test_type is bytes:
            program = b'executable'
            fullpath = os.path.join(os.fsencode(program_path), program)
            native_fullpath = fullpath
            arguments = [b'progname', 'arg1', 'arg2']
        else:
            program = 'executable'
            arguments = ['progname', 'arg1', 'arg2']
            fullpath = os.path.join(program_path, program)
            if os.name != "nt":
                native_fullpath = os.fsencode(fullpath)
            else:
                native_fullpath = fullpath
718 719
        env = {'spam': 'beans'}

720
        # test os._execvpe() with an absolute path
721
        with _execvpe_mockup() as calls:
722 723
            self.assertRaises(RuntimeError,
                os._execvpe, fullpath, arguments)
724 725 726
            self.assertEqual(len(calls), 1)
            self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))

727 728
        # test os._execvpe() with a relative path:
        # os.get_exec_path() returns defpath
729
        with _execvpe_mockup(defpath=program_path) as calls:
730 731 732 733 734 735 736 737 738 739
            self.assertRaises(OSError,
                os._execvpe, program, arguments, env=env)
            self.assertEqual(len(calls), 1)
            self.assertSequenceEqual(calls[0],
                ('execve', native_fullpath, (arguments, env)))

        # test os._execvpe() with a relative path:
        # os.get_exec_path() reads the 'PATH' variable
        with _execvpe_mockup() as calls:
            env_path = env.copy()
740 741 742 743
            if test_type is bytes:
                env_path[b'PATH'] = program_path
            else:
                env_path['PATH'] = program_path
744 745
            self.assertRaises(OSError,
                os._execvpe, program, arguments, env=env_path)
746
            self.assertEqual(len(calls), 1)
747 748 749 750 751 752 753
            self.assertSequenceEqual(calls[0],
                ('execve', native_fullpath, (arguments, env_path)))

    def test_internal_execvpe_str(self):
        self._test_internal_execvpe(str)
        if os.name != "nt":
            self._test_internal_execvpe(bytes)
754

755

756 757
class Win32ErrorTests(unittest.TestCase):
    def test_rename(self):
758
        self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
759 760

    def test_remove(self):
761
        self.assertRaises(WindowsError, os.remove, support.TESTFN)
762 763

    def test_chdir(self):
764
        self.assertRaises(WindowsError, os.chdir, support.TESTFN)
765 766

    def test_mkdir(self):
767
        f = open(support.TESTFN, "w")
Benjamin Peterson's avatar
Benjamin Peterson committed
768 769 770 771
        try:
            self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
        finally:
            f.close()
772
            os.unlink(support.TESTFN)
773 774

    def test_utime(self):
775
        self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
776 777

    def test_chmod(self):
Benjamin Peterson's avatar
Benjamin Peterson committed
778
        self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
779

780
class TestInvalidFD(unittest.TestCase):
781
    singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
782 783 784 785 786
               "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
    #singles.append("close")
    #We omit close because it doesn'r raise an exception on some platforms
    def get_single(f):
        def helper(self):
787 788
            if  hasattr(os, f):
                self.check(getattr(os, f))
789 790 791 792
        return helper
    for f in singles:
        locals()["test_"+f] = get_single(f)

793
    def check(self, f, *args):
Benjamin Peterson's avatar
Benjamin Peterson committed
794 795 796 797 798 799 800
        try:
            f(support.make_bad_fd(), *args)
        except OSError as e:
            self.assertEqual(e.errno, errno.EBADF)
        else:
            self.fail("%r didn't raise a OSError with a bad file descriptor"
                      % f)
801

802 803
    def test_isatty(self):
        if hasattr(os, "isatty"):
804
            self.assertEqual(os.isatty(support.make_bad_fd()), False)
805 806 807

    def test_closerange(self):
        if hasattr(os, "closerange"):
808
            fd = support.make_bad_fd()
809 810 811 812 813 814 815 816 817 818 819 820
            # Make sure none of the descriptors we are about to close are
            # currently valid (issue 6542).
            for i in range(10):
                try: os.fstat(fd+i)
                except OSError:
                    pass
                else:
                    break
            if i < 2:
                raise unittest.SkipTest(
                    "Unable to acquire a range of invalid file descriptors")
            self.assertEqual(os.closerange(fd, fd + i-1), None)
821 822 823

    def test_dup2(self):
        if hasattr(os, "dup2"):
824
            self.check(os.dup2, 20)
825 826 827

    def test_fchmod(self):
        if hasattr(os, "fchmod"):
828
            self.check(os.fchmod, 0)
829 830 831

    def test_fchown(self):
        if hasattr(os, "fchown"):
832
            self.check(os.fchown, -1, -1)
833 834 835

    def test_fpathconf(self):
        if hasattr(os, "fpathconf"):
836
            self.check(os.fpathconf, "PC_NAME_MAX")
837 838 839

    def test_ftruncate(self):
        if hasattr(os, "ftruncate"):
840
            self.check(os.ftruncate, 0)
841 842 843

    def test_lseek(self):
        if hasattr(os, "lseek"):
844
            self.check(os.lseek, 0, 0)
845 846 847

    def test_read(self):
        if hasattr(os, "read"):
848
            self.check(os.read, 1)
849 850 851

    def test_tcsetpgrpt(self):
        if hasattr(os, "tcsetpgrp"):
852
            self.check(os.tcsetpgrp, 0)
853 854 855

    def test_write(self):
        if hasattr(os, "write"):
856
            self.check(os.write, b" ")
857

858 859 860 861
if sys.platform != 'win32':
    class Win32ErrorTests(unittest.TestCase):
        pass

Benjamin Peterson's avatar
Benjamin Peterson committed
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892
    class PosixUidGidTests(unittest.TestCase):
        if hasattr(os, 'setuid'):
            def test_setuid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setuid, 0)
                self.assertRaises(OverflowError, os.setuid, 1<<32)

        if hasattr(os, 'setgid'):
            def test_setgid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setgid, 0)
                self.assertRaises(OverflowError, os.setgid, 1<<32)

        if hasattr(os, 'seteuid'):
            def test_seteuid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.seteuid, 0)
                self.assertRaises(OverflowError, os.seteuid, 1<<32)

        if hasattr(os, 'setegid'):
            def test_setegid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setegid, 0)
                self.assertRaises(OverflowError, os.setegid, 1<<32)

        if hasattr(os, 'setreuid'):
            def test_setreuid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setreuid, 0, 0)
                self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
                self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
893 894 895 896 897 898 899

            def test_setreuid_neg1(self):
                # Needs to accept -1.  We run this in a subprocess to avoid
                # altering the test runner's process state (issue8045).
                subprocess.check_call([
                        sys.executable, '-c',
                        'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Peterson's avatar
Benjamin Peterson committed
900 901 902 903 904 905 906

        if hasattr(os, 'setregid'):
            def test_setregid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setregid, 0, 0)
                self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
                self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
907 908 909 910 911 912 913

            def test_setregid_neg1(self):
                # Needs to accept -1.  We run this in a subprocess to avoid
                # altering the test runner's process state (issue8045).
                subprocess.check_call([
                        sys.executable, '-c',
                        'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
914 915 916

    class Pep383Tests(unittest.TestCase):
        def setUp(self):
917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936
            if support.TESTFN_UNENCODABLE:
                self.dir = support.TESTFN_UNENCODABLE
            else:
                self.dir = support.TESTFN
            self.bdir = os.fsencode(self.dir)

            bytesfn = []
            def add_filename(fn):
                try:
                    fn = os.fsencode(fn)
                except UnicodeEncodeError:
                    return
                bytesfn.append(fn)
            add_filename(support.TESTFN_UNICODE)
            if support.TESTFN_UNENCODABLE:
                add_filename(support.TESTFN_UNENCODABLE)
            if not bytesfn:
                self.skipTest("couldn't create any non-ascii filename")

            self.unicodefn = set()
937
            os.mkdir(self.dir)
938 939 940 941
            try:
                for fn in bytesfn:
                    f = open(os.path.join(self.bdir, fn), "w")
                    f.close()
942
                    fn = os.fsdecode(fn)
943 944 945 946 947 948
                    if fn in self.unicodefn:
                        raise ValueError("duplicate filename")
                    self.unicodefn.add(fn)
            except:
                shutil.rmtree(self.dir)
                raise
949 950 951 952 953

        def tearDown(self):
            shutil.rmtree(self.dir)

        def test_listdir(self):
954 955
            expected = self.unicodefn
            found = set(os.listdir(self.dir))
956 957 958 959 960 961 962 963 964 965
            self.assertEquals(found, expected)

        def test_open(self):
            for fn in self.unicodefn:
                f = open(os.path.join(self.dir, fn))
                f.close()

        def test_stat(self):
            for fn in self.unicodefn:
                os.stat(os.path.join(self.dir, fn))
Benjamin Peterson's avatar
Benjamin Peterson committed
966 967 968
else:
    class PosixUidGidTests(unittest.TestCase):
        pass
969 970
    class Pep383Tests(unittest.TestCase):
        pass
Benjamin Peterson's avatar
Benjamin Peterson committed
971

972 973
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32KillTests(unittest.TestCase):
974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
    def _kill(self, sig):
        # Start sys.executable as a subprocess and communicate from the
        # subprocess to the parent that the interpreter is ready. When it
        # becomes ready, send *sig* via os.kill to the subprocess and check
        # that the return code is equal to *sig*.
        import ctypes
        from ctypes import wintypes
        import msvcrt

        # Since we can't access the contents of the process' stdout until the
        # process has exited, use PeekNamedPipe to see what's inside stdout
        # without waiting. This is done so we can tell that the interpreter
        # is started and running at a point where it could handle a signal.
        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
        PeekNamedPipe.restype = wintypes.BOOL
        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
                                  ctypes.POINTER(ctypes.c_char), # stdout buf
                                  wintypes.DWORD, # Buffer size
                                  ctypes.POINTER(wintypes.DWORD), # bytes read
                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
        msg = "running"
        proc = subprocess.Popen([sys.executable, "-c",
                                 "import sys;"
                                 "sys.stdout.write('{}');"
                                 "sys.stdout.flush();"
                                 "input()".format(msg)],
                                stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE,
                                stdin=subprocess.PIPE)

        count, max = 0, 100
        while count < max and proc.poll() is None:
            # Create a string buffer to store the result of stdout from the pipe
            buf = ctypes.create_string_buffer(len(msg))
            # Obtain the text currently in proc.stdout
            # Bytes read/avail/left are left as NULL and unused
            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
                                 buf, ctypes.sizeof(buf), None, None, None)
            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
            if buf.value:
                self.assertEqual(msg, buf.value.decode())
                break
            time.sleep(0.1)
            count += 1
        else:
            self.fail("Did not receive communication from the subprocess")

1022 1023 1024 1025 1026
        os.kill(proc.pid, sig)
        self.assertEqual(proc.wait(), sig)

    def test_kill_sigterm(self):
        # SIGTERM doesn't mean anything special, but make sure it works
1027
        self._kill(signal.SIGTERM)
1028 1029 1030

    def test_kill_int(self):
        # os.kill on Windows can take an int which gets set as the exit code
1031
        self._kill(100)
1032 1033

    def _kill_with_event(self, event, name):
1034 1035 1036
        tagname = "test_os_%s" % uuid.uuid1()
        m = mmap.mmap(-1, 1, tagname)
        m[0] = 0
1037 1038 1039
        # Run a script which has console control handling enabled.
        proc = subprocess.Popen([sys.executable,
                   os.path.join(os.path.dirname(__file__),
1040
                                "win_console_handler.py"), tagname],
1041 1042
                   creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
        # Let the interpreter startup before we send signals. See #3137.
1043 1044 1045 1046 1047 1048 1049 1050
        count, max = 0, 20
        while count < max and proc.poll() is None:
            if m[0] == 0:
                break
            time.sleep(0.5)
            count += 1
        else:
            self.fail("Subprocess didn't finish initialization")
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
        os.kill(proc.pid, event)
        # proc.send_signal(event) could also be done here.
        # Allow time for the signal to be passed and the process to exit.
        time.sleep(0.5)
        if not proc.poll():
            # Forcefully kill the process if we weren't able to signal it.
            os.kill(proc.pid, signal.SIGINT)
            self.fail("subprocess did not stop on {}".format(name))

    @unittest.skip("subprocesses aren't inheriting CTRL+C property")
    def test_CTRL_C_EVENT(self):
        from ctypes import wintypes
        import ctypes

        # Make a NULL value by creating a pointer with no argument.
        NULL = ctypes.POINTER(ctypes.c_int)()
        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
                                          wintypes.BOOL)
        SetConsoleCtrlHandler.restype = wintypes.BOOL

        # Calling this with NULL and FALSE causes the calling process to
        # handle CTRL+C, rather than ignore it. This property is inherited
        # by subprocesses.
        SetConsoleCtrlHandler(NULL, 0)

        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")

    def test_CTRL_BREAK_EVENT(self):
        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")


1083
def skipUnlessWindows6(test):
1084 1085
    if (hasattr(sys, 'getwindowsversion')
        and sys.getwindowsversion().major >= 6):
1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160
        return test
    return unittest.skip("Requires Windows Vista or later")(test)

@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@support.skip_unless_symlink
class Win32SymlinkTests(unittest.TestCase):
    filelink = 'filelinktest'
    filelink_target = os.path.abspath(__file__)
    dirlink = 'dirlinktest'
    dirlink_target = os.path.dirname(filelink_target)
    missing_link = 'missing link'

    def setUp(self):
        assert os.path.exists(self.dirlink_target)
        assert os.path.exists(self.filelink_target)
        assert not os.path.exists(self.dirlink)
        assert not os.path.exists(self.filelink)
        assert not os.path.exists(self.missing_link)

    def tearDown(self):
        if os.path.exists(self.filelink):
            os.remove(self.filelink)
        if os.path.exists(self.dirlink):
            os.rmdir(self.dirlink)
        if os.path.lexists(self.missing_link):
            os.remove(self.missing_link)

    def test_directory_link(self):
        os.symlink(self.dirlink_target, self.dirlink)
        self.assertTrue(os.path.exists(self.dirlink))
        self.assertTrue(os.path.isdir(self.dirlink))
        self.assertTrue(os.path.islink(self.dirlink))
        self.check_stat(self.dirlink, self.dirlink_target)

    def test_file_link(self):
        os.symlink(self.filelink_target, self.filelink)
        self.assertTrue(os.path.exists(self.filelink))
        self.assertTrue(os.path.isfile(self.filelink))
        self.assertTrue(os.path.islink(self.filelink))
        self.check_stat(self.filelink, self.filelink_target)

    def _create_missing_dir_link(self):
        'Create a "directory" link to a non-existent target'
        linkname = self.missing_link
        if os.path.lexists(linkname):
            os.remove(linkname)
        target = r'c:\\target does not exist.29r3c740'
        assert not os.path.exists(target)
        target_is_dir = True
        os.symlink(target, linkname, target_is_dir)

    def test_remove_directory_link_to_missing_target(self):
        self._create_missing_dir_link()
        # For compatibility with Unix, os.remove will check the
        #  directory status and call RemoveDirectory if the symlink
        #  was created with target_is_dir==True.
        os.remove(self.missing_link)

    @unittest.skip("currently fails; consider for improvement")
    def test_isdir_on_directory_link_to_missing_target(self):
        self._create_missing_dir_link()
        # consider having isdir return true for directory links
        self.assertTrue(os.path.isdir(self.missing_link))

    @unittest.skip("currently fails; consider for improvement")
    def test_rmdir_on_directory_link_to_missing_target(self):
        self._create_missing_dir_link()
        # consider allowing rmdir to remove directory links
        os.rmdir(self.missing_link)

    def check_stat(self, link, target):
        self.assertEqual(os.stat(link), os.stat(target))
        self.assertNotEqual(os.lstat(link), os.stat(link))


1161 1162 1163 1164
class FSEncodingTests(unittest.TestCase):
    def test_nop(self):
        self.assertEquals(os.fsencode(b'abc\xff'), b'abc\xff')
        self.assertEquals(os.fsdecode('abc\u0141'), 'abc\u0141')
1165

1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186
    def test_identity(self):
        # assert fsdecode(fsencode(x)) == x
        for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
            try:
                bytesfn = os.fsencode(fn)
            except UnicodeEncodeError:
                continue
            self.assertEquals(os.fsdecode(bytesfn), fn)

    def get_output(self, fs_encoding, func):
        env = os.environ.copy()
        env['PYTHONIOENCODING'] = 'utf-8'
        env['PYTHONFSENCODING'] = fs_encoding
        code = 'import os; print(%s, end="")' % func
        process = subprocess.Popen(
            [sys.executable, "-c", code],
            stdout=subprocess.PIPE, env=env)
        stdout, stderr = process.communicate()
        self.assertEqual(process.returncode, 0)
        return stdout.decode('utf-8')

1187 1188
    @unittest.skipIf(sys.platform in ('win32', 'darwin'),
                     'PYTHONFSENCODING is ignored on Windows and Mac OS X')
1189 1190 1191 1192 1193 1194 1195 1196 1197
    def test_encodings(self):
        def check(encoding, bytesfn, unicodefn):
            encoded = self.get_output(encoding, 'repr(os.fsencode(%a))' % unicodefn)
            self.assertEqual(encoded, repr(bytesfn))

            decoded = self.get_output(encoding, 'repr(os.fsdecode(%a))' % bytesfn)
            self.assertEqual(decoded, repr(unicodefn))

        check('utf-8', b'\xc3\xa9\x80', '\xe9\udc80')
1198 1199 1200 1201 1202 1203

        # Raise SkipTest() if sys.executable is not encodable to ascii
        support.workaroundIssue8611()

        check('ascii', b'abc\xff', 'abc\udcff')
        check('iso-8859-15', b'\xef\xa4', '\xef\u20ac')
1204 1205


1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
class PidTests(unittest.TestCase):
    @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
    def test_getppid(self):
        p = subprocess.Popen([sys.executable, '-c',
                              'import os; print(os.getppid())'],
                             stdout=subprocess.PIPE)
        stdout, _ = p.communicate()
        # We are the parent of our subprocess
        self.assertEqual(int(stdout), os.getpid())


1217 1218 1219
# The introduction of this TestCase caused at least two different errors on
# *nix buildbots. Temporarily skip this to let the buildbots move along.
@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
1220 1221 1222 1223 1224 1225 1226
@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
class LoginTests(unittest.TestCase):
    def test_getlogin(self):
        user_name = os.getlogin()
        self.assertNotEqual(len(user_name), 0)


1227
def test_main():
1228
    support.run_unittest(
1229
        FileTests,
1230 1231
        StatAttributeTests,
        EnvironTests,
1232 1233
        WalkTests,
        MakedirTests,
1234
        DevNullTests,
1235
        URandomTests,
1236
        ExecTests,
1237
        Win32ErrorTests,
Benjamin Peterson's avatar
Benjamin Peterson committed
1238
        TestInvalidFD,
1239
        PosixUidGidTests,
1240
        Pep383Tests,
1241
        Win32KillTests,
1242
        Win32SymlinkTests,
1243
        FSEncodingTests,
1244
        PidTests,
1245
        LoginTests,
1246
    )
1247 1248 1249

if __name__ == "__main__":
    test_main()