Commit e9b51c0a authored by Serhiy Storchaka's avatar Serhiy Storchaka Committed by GitHub

bpo-26660, bpo-35144: Fix permission errors in TemporaryDirectory cleanup. (GH-10320)

TemporaryDirectory.cleanup() failed when non-writeable or non-searchable
files or directories were created inside a temporary directory.
parent 38ab7d47
...@@ -584,11 +584,16 @@ def _rmtree_safe_fd(topfd, path, onerror): ...@@ -584,11 +584,16 @@ def _rmtree_safe_fd(topfd, path, onerror):
fullname = os.path.join(path, entry.name) fullname = os.path.join(path, entry.name)
try: try:
is_dir = entry.is_dir(follow_symlinks=False) is_dir = entry.is_dir(follow_symlinks=False)
except OSError:
is_dir = False
else:
if is_dir: if is_dir:
try:
orig_st = entry.stat(follow_symlinks=False) orig_st = entry.stat(follow_symlinks=False)
is_dir = stat.S_ISDIR(orig_st.st_mode) is_dir = stat.S_ISDIR(orig_st.st_mode)
except OSError: except OSError:
is_dir = False onerror(os.lstat, fullname, sys.exc_info())
continue
if is_dir: if is_dir:
try: try:
dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd)
......
...@@ -777,9 +777,39 @@ class TemporaryDirectory(object): ...@@ -777,9 +777,39 @@ class TemporaryDirectory(object):
self, self._cleanup, self.name, self, self._cleanup, self.name,
warn_message="Implicitly cleaning up {!r}".format(self)) warn_message="Implicitly cleaning up {!r}".format(self))
@classmethod
def _rmtree(cls, name):
def onerror(func, path, exc_info):
if issubclass(exc_info[0], PermissionError):
def resetperms(path):
try:
_os.chflags(path, 0)
except AttributeError:
pass
_os.chmod(path, 0o700)
try:
if path != name:
resetperms(_os.path.dirname(path))
resetperms(path)
try:
_os.unlink(path)
# PermissionError is raised on FreeBSD for directories
except (IsADirectoryError, PermissionError):
cls._rmtree(path)
except FileNotFoundError:
pass
elif issubclass(exc_info[0], FileNotFoundError):
pass
else:
raise
_shutil.rmtree(name, onerror=onerror)
@classmethod @classmethod
def _cleanup(cls, name, warn_message): def _cleanup(cls, name, warn_message):
_shutil.rmtree(name) cls._rmtree(name)
_warnings.warn(warn_message, ResourceWarning) _warnings.warn(warn_message, ResourceWarning)
def __repr__(self): def __repr__(self):
...@@ -793,4 +823,4 @@ class TemporaryDirectory(object): ...@@ -793,4 +823,4 @@ class TemporaryDirectory(object):
def cleanup(self): def cleanup(self):
if self._finalizer.detach(): if self._finalizer.detach():
_shutil.rmtree(self.name) self._rmtree(self.name)
...@@ -1297,18 +1297,24 @@ class NulledModules: ...@@ -1297,18 +1297,24 @@ class NulledModules:
class TestTemporaryDirectory(BaseTestCase): class TestTemporaryDirectory(BaseTestCase):
"""Test TemporaryDirectory().""" """Test TemporaryDirectory()."""
def do_create(self, dir=None, pre="", suf="", recurse=1): def do_create(self, dir=None, pre="", suf="", recurse=1, dirs=1, files=1):
if dir is None: if dir is None:
dir = tempfile.gettempdir() dir = tempfile.gettempdir()
tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf)
self.nameCheck(tmp.name, dir, pre, suf) self.nameCheck(tmp.name, dir, pre, suf)
# Create a subdirectory and some files self.do_create2(tmp.name, recurse, dirs, files)
return tmp
def do_create2(self, path, recurse=1, dirs=1, files=1):
# Create subdirectories and some files
if recurse: if recurse:
d1 = self.do_create(tmp.name, pre, suf, recurse-1) for i in range(dirs):
d1.name = None name = os.path.join(path, "dir%d" % i)
with open(os.path.join(tmp.name, "test.txt"), "wb") as f: os.mkdir(name)
self.do_create2(name, recurse-1, dirs, files)
for i in range(files):
with open(os.path.join(path, "test%d.txt" % i), "wb") as f:
f.write(b"Hello world!") f.write(b"Hello world!")
return tmp
def test_mkdtemp_failure(self): def test_mkdtemp_failure(self):
# Check no additional exception if mkdtemp fails # Check no additional exception if mkdtemp fails
...@@ -1349,7 +1355,7 @@ class TestTemporaryDirectory(BaseTestCase): ...@@ -1349,7 +1355,7 @@ class TestTemporaryDirectory(BaseTestCase):
"TemporaryDirectory %s exists after cleanup" % d1.name) "TemporaryDirectory %s exists after cleanup" % d1.name)
self.assertTrue(os.path.exists(d2.name), self.assertTrue(os.path.exists(d2.name),
"Directory pointed to by a symlink was deleted") "Directory pointed to by a symlink was deleted")
self.assertEqual(os.listdir(d2.name), ['test.txt'], self.assertEqual(os.listdir(d2.name), ['test0.txt'],
"Contents of the directory pointed to by a symlink " "Contents of the directory pointed to by a symlink "
"were deleted") "were deleted")
d2.cleanup() d2.cleanup()
...@@ -1384,7 +1390,7 @@ class TestTemporaryDirectory(BaseTestCase): ...@@ -1384,7 +1390,7 @@ class TestTemporaryDirectory(BaseTestCase):
tmp2 = os.path.join(tmp.name, 'test_dir') tmp2 = os.path.join(tmp.name, 'test_dir')
os.mkdir(tmp2) os.mkdir(tmp2)
with open(os.path.join(tmp2, "test.txt"), "w") as f: with open(os.path.join(tmp2, "test0.txt"), "w") as f:
f.write("Hello world!") f.write("Hello world!")
{mod}.tmp = tmp {mod}.tmp = tmp
...@@ -1452,6 +1458,33 @@ class TestTemporaryDirectory(BaseTestCase): ...@@ -1452,6 +1458,33 @@ class TestTemporaryDirectory(BaseTestCase):
self.assertEqual(name, d.name) self.assertEqual(name, d.name)
self.assertFalse(os.path.exists(name)) self.assertFalse(os.path.exists(name))
def test_modes(self):
for mode in range(8):
mode <<= 6
with self.subTest(mode=format(mode, '03o')):
d = self.do_create(recurse=3, dirs=2, files=2)
with d:
# Change files and directories mode recursively.
for root, dirs, files in os.walk(d.name, topdown=False):
for name in files:
os.chmod(os.path.join(root, name), mode)
os.chmod(root, mode)
d.cleanup()
self.assertFalse(os.path.exists(d.name))
@unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.lchflags')
def test_flags(self):
flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK
d = self.do_create(recurse=3, dirs=2, files=2)
with d:
# Change files and directories flags recursively.
for root, dirs, files in os.walk(d.name, topdown=False):
for name in files:
os.chflags(os.path.join(root, name), flags)
os.chflags(root, flags)
d.cleanup()
self.assertFalse(os.path.exists(d.name))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Fixed permission errors in :class:`~tempfile.TemporaryDirectory` clean up.
Previously ``TemporaryDirectory.cleanup()`` failed when non-writeable or
non-searchable files or directories were created inside a temporary
directory.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment