Commit 7438c612 authored by Victor Stinner's avatar Victor Stinner

Use "with popen:" in test_subprocess

Issue #26741.
parent 19ed27ec
...@@ -443,8 +443,8 @@ class ProcessTestCase(BaseTestCase): ...@@ -443,8 +443,8 @@ class ProcessTestCase(BaseTestCase):
p = subprocess.Popen([sys.executable, "-c", p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stdout.write("orange")'], 'import sys; sys.stdout.write("orange")'],
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
self.addCleanup(p.stdout.close) with p:
self.assertEqual(p.stdout.read(), b"orange") self.assertEqual(p.stdout.read(), b"orange")
def test_stdout_filedes(self): def test_stdout_filedes(self):
# stdout is set to open file descriptor # stdout is set to open file descriptor
...@@ -474,8 +474,8 @@ class ProcessTestCase(BaseTestCase): ...@@ -474,8 +474,8 @@ class ProcessTestCase(BaseTestCase):
p = subprocess.Popen([sys.executable, "-c", p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stderr.write("strawberry")'], 'import sys; sys.stderr.write("strawberry")'],
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
self.addCleanup(p.stderr.close) with p:
self.assertStderrEqual(p.stderr.read(), b"strawberry") self.assertStderrEqual(p.stderr.read(), b"strawberry")
def test_stderr_filedes(self): def test_stderr_filedes(self):
# stderr is set to open file descriptor # stderr is set to open file descriptor
...@@ -530,8 +530,8 @@ class ProcessTestCase(BaseTestCase): ...@@ -530,8 +530,8 @@ class ProcessTestCase(BaseTestCase):
'sys.stderr.write("orange")'], 'sys.stderr.write("orange")'],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT) stderr=subprocess.STDOUT)
self.addCleanup(p.stdout.close) with p:
self.assertStderrEqual(p.stdout.read(), b"appleorange") self.assertStderrEqual(p.stdout.read(), b"appleorange")
def test_stdout_stderr_file(self): def test_stdout_stderr_file(self):
# capture stdout and stderr to the same open file # capture stdout and stderr to the same open file
...@@ -789,18 +789,19 @@ class ProcessTestCase(BaseTestCase): ...@@ -789,18 +789,19 @@ class ProcessTestCase(BaseTestCase):
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
universal_newlines=1) universal_newlines=1)
p.stdin.write("line1\n") with p:
p.stdin.flush() p.stdin.write("line1\n")
self.assertEqual(p.stdout.readline(), "line1\n") p.stdin.flush()
p.stdin.write("line3\n") self.assertEqual(p.stdout.readline(), "line1\n")
p.stdin.close() p.stdin.write("line3\n")
self.addCleanup(p.stdout.close) p.stdin.close()
self.assertEqual(p.stdout.readline(), self.addCleanup(p.stdout.close)
"line2\n") self.assertEqual(p.stdout.readline(),
self.assertEqual(p.stdout.read(6), "line2\n")
"line3\n") self.assertEqual(p.stdout.read(6),
self.assertEqual(p.stdout.read(), "line3\n")
"line4\nline5\nline6\nline7\nline8") self.assertEqual(p.stdout.read(),
"line4\nline5\nline6\nline7\nline8")
def test_universal_newlines_communicate(self): def test_universal_newlines_communicate(self):
# universal newlines through communicate() # universal newlines through communicate()
...@@ -1434,8 +1435,8 @@ class POSIXProcessTestCase(BaseTestCase): ...@@ -1434,8 +1435,8 @@ class POSIXProcessTestCase(BaseTestCase):
'sys.stdout.write(os.getenv("FRUIT"))'], 'sys.stdout.write(os.getenv("FRUIT"))'],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
preexec_fn=lambda: os.putenv("FRUIT", "apple")) preexec_fn=lambda: os.putenv("FRUIT", "apple"))
self.addCleanup(p.stdout.close) with p:
self.assertEqual(p.stdout.read(), b"apple") self.assertEqual(p.stdout.read(), b"apple")
def test_preexec_exception(self): def test_preexec_exception(self):
def raise_it(): def raise_it():
...@@ -1583,8 +1584,8 @@ class POSIXProcessTestCase(BaseTestCase): ...@@ -1583,8 +1584,8 @@ class POSIXProcessTestCase(BaseTestCase):
p = subprocess.Popen(["echo $FRUIT"], shell=1, p = subprocess.Popen(["echo $FRUIT"], shell=1,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
env=newenv) env=newenv)
self.addCleanup(p.stdout.close) with p:
self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
def test_shell_string(self): def test_shell_string(self):
# Run command through the shell (string) # Run command through the shell (string)
...@@ -1593,8 +1594,8 @@ class POSIXProcessTestCase(BaseTestCase): ...@@ -1593,8 +1594,8 @@ class POSIXProcessTestCase(BaseTestCase):
p = subprocess.Popen("echo $FRUIT", shell=1, p = subprocess.Popen("echo $FRUIT", shell=1,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
env=newenv) env=newenv)
self.addCleanup(p.stdout.close) with p:
self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
def test_call_string(self): def test_call_string(self):
# call() function with string argument on UNIX # call() function with string argument on UNIX
...@@ -1626,8 +1627,8 @@ class POSIXProcessTestCase(BaseTestCase): ...@@ -1626,8 +1627,8 @@ class POSIXProcessTestCase(BaseTestCase):
for sh in shells: for sh in shells:
p = subprocess.Popen("echo $0", executable=sh, shell=True, p = subprocess.Popen("echo $0", executable=sh, shell=True,
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
self.addCleanup(p.stdout.close) with p:
self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
def _kill_process(self, method, *args): def _kill_process(self, method, *args):
# Do not inherit file handles from the parent. # Do not inherit file handles from the parent.
...@@ -2450,8 +2451,8 @@ class Win32ProcessTestCase(BaseTestCase): ...@@ -2450,8 +2451,8 @@ class Win32ProcessTestCase(BaseTestCase):
p = subprocess.Popen(["set"], shell=1, p = subprocess.Popen(["set"], shell=1,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
env=newenv) env=newenv)
self.addCleanup(p.stdout.close) with p:
self.assertIn(b"physalis", p.stdout.read()) self.assertIn(b"physalis", p.stdout.read())
def test_shell_string(self): def test_shell_string(self):
# Run command through the shell (string) # Run command through the shell (string)
...@@ -2460,8 +2461,8 @@ class Win32ProcessTestCase(BaseTestCase): ...@@ -2460,8 +2461,8 @@ class Win32ProcessTestCase(BaseTestCase):
p = subprocess.Popen("set", shell=1, p = subprocess.Popen("set", shell=1,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
env=newenv) env=newenv)
self.addCleanup(p.stdout.close) with p:
self.assertIn(b"physalis", p.stdout.read()) self.assertIn(b"physalis", p.stdout.read())
def test_call_string(self): def test_call_string(self):
# call() function with string argument on Windows # call() function with string argument on Windows
...@@ -2480,16 +2481,14 @@ class Win32ProcessTestCase(BaseTestCase): ...@@ -2480,16 +2481,14 @@ class Win32ProcessTestCase(BaseTestCase):
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close) with p:
self.addCleanup(p.stderr.close) # Wait for the interpreter to be completely initialized before
self.addCleanup(p.stdin.close) # sending any signal.
# Wait for the interpreter to be completely initialized before p.stdout.read(1)
# sending any signal. getattr(p, method)(*args)
p.stdout.read(1) _, stderr = p.communicate()
getattr(p, method)(*args) self.assertStderrEqual(stderr, b'')
_, stderr = p.communicate() returncode = p.wait()
self.assertStderrEqual(stderr, b'')
returncode = p.wait()
self.assertNotEqual(returncode, 0) self.assertNotEqual(returncode, 0)
def _kill_dead_process(self, method, *args): def _kill_dead_process(self, method, *args):
...@@ -2502,19 +2501,17 @@ class Win32ProcessTestCase(BaseTestCase): ...@@ -2502,19 +2501,17 @@ class Win32ProcessTestCase(BaseTestCase):
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close) with p:
self.addCleanup(p.stderr.close) # Wait for the interpreter to be completely initialized before
self.addCleanup(p.stdin.close) # sending any signal.
# Wait for the interpreter to be completely initialized before p.stdout.read(1)
# sending any signal. # The process should end after this
p.stdout.read(1) time.sleep(1)
# The process should end after this # This shouldn't raise even though the child is now dead
time.sleep(1) getattr(p, method)(*args)
# This shouldn't raise even though the child is now dead _, stderr = p.communicate()
getattr(p, method)(*args) self.assertStderrEqual(stderr, b'')
_, stderr = p.communicate() rc = p.wait()
self.assertStderrEqual(stderr, b'')
rc = p.wait()
self.assertEqual(rc, 42) self.assertEqual(rc, 42)
def test_send_signal(self): def test_send_signal(self):
...@@ -2602,11 +2599,11 @@ class CommandsWithSpaces (BaseTestCase): ...@@ -2602,11 +2599,11 @@ class CommandsWithSpaces (BaseTestCase):
def with_spaces(self, *args, **kwargs): def with_spaces(self, *args, **kwargs):
kwargs['stdout'] = subprocess.PIPE kwargs['stdout'] = subprocess.PIPE
p = subprocess.Popen(*args, **kwargs) p = subprocess.Popen(*args, **kwargs)
self.addCleanup(p.stdout.close) with p:
self.assertEqual( self.assertEqual(
p.stdout.read ().decode("mbcs"), p.stdout.read ().decode("mbcs"),
"2 [%r, 'ab cd']" % self.fname "2 [%r, 'ab cd']" % self.fname
) )
def test_shell_string_with_spaces(self): def test_shell_string_with_spaces(self):
# call() function with string argument with spaces on Windows # call() function with string argument with spaces on Windows
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment