Commit f3f5f8d7 authored by Martín Ferrari's avatar Martín Ferrari

Use different codepaths to increase test coverage.

parent 56f48a81
#!/usr/bin/env python #!/usr/bin/env python
# vim:ts=4:sw=4:et:ai:sts=4 # vim:ts=4:sw=4:et:ai:sts=4
import nemu, nemu.subprocess_, test_util import nemu, test_util
import nemu.subprocess_ as sp
import grp, os, pwd, signal, socket, sys, time, unittest import grp, os, pwd, signal, socket, sys, time, unittest
from nemu.subprocess_ import *
def _stat(path): def _stat(path):
try: try:
return os.stat(user) return os.stat(user)
...@@ -72,84 +71,84 @@ class TestSubprocess(unittest.TestCase): ...@@ -72,84 +71,84 @@ class TestSubprocess(unittest.TestCase):
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
def test_spawn_chuser(self): def test_spawn_chuser(self):
user = 'nobody' user = 'nobody'
pid = spawn('/bin/sleep', ['/bin/sleep', '100'], user = user) pid = sp.spawn('/bin/sleep', ['/bin/sleep', '100'], user = user)
self._check_ownership(user, pid) self._check_ownership(user, pid)
os.kill(pid, signal.SIGTERM) os.kill(pid, signal.SIGTERM)
self.assertEquals(wait(pid), signal.SIGTERM) self.assertEquals(sp.wait(pid), signal.SIGTERM)
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
def test_Subprocess_chuser(self): def test_Subprocess_chuser(self):
node = nemu.Node(nonetns = True) node = nemu.Node(nonetns = True)
user = 'nobody' user = 'nobody'
p = Subprocess(node, ['/bin/sleep', '1000'], user = user) p = node.Subprocess(['/bin/sleep', '1000'], user = user)
self._check_ownership(user, p.pid) self._check_ownership(user, p.pid)
p.signal() p.signal()
self.assertEquals(p.wait(), -signal.SIGTERM) self.assertEquals(p.wait(), -signal.SIGTERM)
def test_spawn_basic(self): def test_spawn_basic(self):
# User does not exist # User does not exist
self.assertRaises(ValueError, spawn, self.assertRaises(ValueError, sp.spawn,
'/bin/sleep', ['/bin/sleep', '1000'], user = self.nouser) '/bin/sleep', ['/bin/sleep', '1000'], user = self.nouser)
self.assertRaises(ValueError, spawn, self.assertRaises(ValueError, sp.spawn,
'/bin/sleep', ['/bin/sleep', '1000'], user = self.nouid) '/bin/sleep', ['/bin/sleep', '1000'], user = self.nouid)
# Invalid CWD: it is a file # Invalid CWD: it is a file
self.assertRaises(OSError, spawn, '/bin/sleep', cwd = '/bin/sleep') self.assertRaises(OSError, sp.spawn, '/bin/sleep', cwd = '/bin/sleep')
# Invalid CWD: does not exist # Invalid CWD: does not exist
self.assertRaises(OSError, spawn, '/bin/sleep', cwd = self.nofile) self.assertRaises(OSError, sp.spawn, '/bin/sleep', cwd = self.nofile)
# Exec failure # Exec failure
self.assertRaises(OSError, spawn, self.nofile) self.assertRaises(OSError, sp.spawn, self.nofile)
# Test that the environment is cleared: sleep should not be found # Test that the environment is cleared: sleep should not be found
# XXX: This should be a python bug: if I don't set PATH explicitly, it # XXX: This should be a python bug: if I don't set PATH explicitly, it
# uses a default search path # uses a default search path
self.assertRaises(OSError, spawn, 'sleep', env = {'PATH': ''}) self.assertRaises(OSError, sp.spawn, 'sleep', env = {'PATH': ''})
r, w = os.pipe() r, w = os.pipe()
p = spawn('/bin/echo', ['echo', 'hello world'], stdout = w) p = sp.spawn('/bin/echo', ['echo', 'hello world'], stdout = w)
os.close(w) os.close(w)
self.assertEquals(_readall(r), "hello world\n") self.assertEquals(_readall(r), "hello world\n")
os.close(r) os.close(r)
r0, w0 = os.pipe() r0, w0 = os.pipe()
r1, w1 = os.pipe() r1, w1 = os.pipe()
p = spawn('/bin/cat', stdout = w0, stdin = r1, close_fds = [r0, w1]) p = sp.spawn('/bin/cat', stdout = w0, stdin = r1, close_fds = [r0, w1])
os.close(w0) os.close(w0)
os.close(r1) os.close(r1)
self.assertEquals(poll(p), None) self.assertEquals(sp.poll(p), None)
os.write(w1, "hello world\n") os.write(w1, "hello world\n")
os.close(w1) os.close(w1)
self.assertEquals(_readall(r0), "hello world\n") self.assertEquals(_readall(r0), "hello world\n")
os.close(r0) os.close(r0)
self.assertEquals(wait(p), 0) self.assertEquals(sp.wait(p), 0)
def test_Subprocess_basic(self): def test_Subprocess_basic(self):
node = nemu.Node(nonetns = True) node = nemu.Node(nonetns = True)
# User does not exist # User does not exist
self.assertRaises(ValueError, Subprocess, node, self.assertRaises(ValueError, node.Subprocess,
['/bin/sleep', '1000'], user = self.nouser) ['/bin/sleep', '1000'], user = self.nouser)
self.assertRaises(ValueError, Subprocess, node, self.assertRaises(ValueError, node.Subprocess,
['/bin/sleep', '1000'], user = self.nouid) ['/bin/sleep', '1000'], user = self.nouid)
# Invalid CWD: it is a file # Invalid CWD: it is a file
self.assertRaises(OSError, Subprocess, node, self.assertRaises(OSError, node.Subprocess,
'/bin/sleep', cwd = '/bin/sleep') '/bin/sleep', cwd = '/bin/sleep')
# Invalid CWD: does not exist # Invalid CWD: does not exist
self.assertRaises(OSError, Subprocess, node, self.assertRaises(OSError, node.Subprocess,
'/bin/sleep', cwd = self.nofile) '/bin/sleep', cwd = self.nofile)
# Exec failure # Exec failure
self.assertRaises(OSError, Subprocess, node, self.nofile) self.assertRaises(OSError, node.Subprocess, self.nofile)
# Test that the environment is cleared: sleep should not be found # Test that the environment is cleared: sleep should not be found
self.assertRaises(OSError, Subprocess, node, self.assertRaises(OSError, node.Subprocess,
'sleep', env = {'PATH': ''}) 'sleep', env = {'PATH': ''})
# Argv # Argv
self.assertRaises(OSError, Subprocess, node, 'true; false') self.assertRaises(OSError, node.Subprocess, 'true; false')
self.assertEquals(Subprocess(node, 'true').wait(), 0) self.assertEquals(node.Subprocess('true').wait(), 0)
self.assertEquals(Subprocess(node, 'true; false', shell = True).wait(), self.assertEquals(node.Subprocess('true; false', shell = True).wait(),
1) 1)
# Piping # Piping
r, w = os.pipe() r, w = os.pipe()
p = Subprocess(node, ['echo', 'hello world'], stdout = w) p = node.Subprocess(['echo', 'hello world'], stdout = w)
os.close(w) os.close(w)
self.assertEquals(_readall(r), "hello world\n") self.assertEquals(_readall(r), "hello world\n")
os.close(r) os.close(r)
...@@ -157,13 +156,13 @@ class TestSubprocess(unittest.TestCase): ...@@ -157,13 +156,13 @@ class TestSubprocess(unittest.TestCase):
# cwd # cwd
r, w = os.pipe() r, w = os.pipe()
p = Subprocess(node, '/bin/pwd', stdout = w, cwd = "/") p = node.Subprocess('/bin/pwd', stdout = w, cwd = "/")
os.close(w) os.close(w)
self.assertEquals(_readall(r), "/\n") self.assertEquals(_readall(r), "/\n")
os.close(r) os.close(r)
p.wait() p.wait()
p = Subprocess(node, ['sleep', '100']) p = node.Subprocess(['sleep', '100'])
self.assertTrue(p.pid > 0) self.assertTrue(p.pid > 0)
self.assertEquals(p.poll(), None) # not finished self.assertEquals(p.poll(), None) # not finished
p.signal() p.signal()
...@@ -173,7 +172,7 @@ class TestSubprocess(unittest.TestCase): ...@@ -173,7 +172,7 @@ class TestSubprocess(unittest.TestCase):
self.assertEquals(p.poll(), -signal.SIGTERM) # no-op self.assertEquals(p.poll(), -signal.SIGTERM) # no-op
# destroy # destroy
p = Subprocess(node, ['sleep', '100']) p = node.Subprocess(['sleep', '100'])
pid = p.pid pid = p.pid
os.kill(pid, 0) # verify process still there os.kill(pid, 0) # verify process still there
p.destroy() p.destroy()
...@@ -185,7 +184,7 @@ class TestSubprocess(unittest.TestCase): ...@@ -185,7 +184,7 @@ class TestSubprocess(unittest.TestCase):
cmd = 'trap "" TERM; echo; exec sleep 100 > /dev/null' cmd = 'trap "" TERM; echo; exec sleep 100 > /dev/null'
r, w = os.pipe() r, w = os.pipe()
p = Subprocess(node, cmd, shell = True, stdout = w) p = node.Subprocess(cmd, shell = True, stdout = w)
os.close(w) os.close(w)
self.assertEquals(_readall(r), "\n") # wait for trap to be installed self.assertEquals(_readall(r), "\n") # wait for trap to be installed
os.close(r) os.close(r)
...@@ -194,7 +193,7 @@ class TestSubprocess(unittest.TestCase): ...@@ -194,7 +193,7 @@ class TestSubprocess(unittest.TestCase):
p.destroy() p.destroy()
self.assertRaises(OSError, os.kill, pid, 0) # should be dead by now self.assertRaises(OSError, os.kill, pid, 0) # should be dead by now
p = Subprocess(node, ['sleep', '100']) p = node.Subprocess(['sleep', '100'])
os.kill(p.pid, signal.SIGTERM) os.kill(p.pid, signal.SIGTERM)
time.sleep(0.2) time.sleep(0.2)
p.signal() # since it has not been waited for, it should not raise p.signal() # since it has not been waited for, it should not raise
...@@ -206,7 +205,7 @@ class TestSubprocess(unittest.TestCase): ...@@ -206,7 +205,7 @@ class TestSubprocess(unittest.TestCase):
# repeat test with Popen interface # repeat test with Popen interface
r0, w0 = os.pipe() r0, w0 = os.pipe()
r1, w1 = os.pipe() r1, w1 = os.pipe()
p = Popen(node, 'cat', stdout = w0, stdin = r1) p = node.Popen('cat', stdout = w0, stdin = r1)
os.close(w0) os.close(w0)
os.close(r1) os.close(r1)
os.write(w1, "hello world\n") os.write(w1, "hello world\n")
...@@ -216,96 +215,96 @@ class TestSubprocess(unittest.TestCase): ...@@ -216,96 +215,96 @@ class TestSubprocess(unittest.TestCase):
# now with a socketpair, not using integers # now with a socketpair, not using integers
(s0, s1) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0) (s0, s1) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0)
p = Popen(node, 'cat', stdout = s0, stdin = s0) p = node.Popen('cat', stdout = s0, stdin = s0)
s0.close() s0.close()
s1.send("hello world\n") s1.send("hello world\n")
self.assertEquals(s1.recv(512), "hello world\n") self.assertEquals(s1.recv(512), "hello world\n")
s1.close() s1.close()
# pipes # pipes
p = Popen(node, 'cat', stdin = PIPE, stdout = PIPE) p = node.Popen('cat', stdin = sp.PIPE, stdout = sp.PIPE)
p.stdin.write("hello world\n") p.stdin.write("hello world\n")
p.stdin.close() p.stdin.close()
self.assertEquals(p.stdout.readlines(), ["hello world\n"]) self.assertEquals(p.stdout.readlines(), ["hello world\n"])
self.assertEquals(p.stderr, None) self.assertEquals(p.stderr, None)
self.assertEquals(p.wait(), 0) self.assertEquals(p.wait(), 0)
p = Popen(node, 'cat', stdin = PIPE, stdout = PIPE) p = node.Popen('cat', stdin = sp.PIPE, stdout = sp.PIPE)
self.assertEquals(p.communicate(_longstring), (_longstring, None)) self.assertEquals(p.communicate(_longstring), (_longstring, None))
p = Popen(node, 'cat', stdin = PIPE, stdout = PIPE) p = node.Popen('cat', stdin = sp.PIPE, stdout = sp.PIPE)
p.stdin.write(_longstring) p.stdin.write(_longstring)
self.assertEquals(p.communicate(), (_longstring, None)) self.assertEquals(p.communicate(), (_longstring, None))
p = Popen(node, 'cat', stdin = PIPE) p = node.Popen('cat', stdin = sp.PIPE)
self.assertEquals(p.communicate(), (None, None)) self.assertEquals(p.communicate(), (None, None))
p = Popen(node, 'cat >&2', shell = True, stdin = PIPE, stderr = PIPE) p = node.Popen('cat >&2', shell = True, stdin = sp.PIPE, stderr = sp.PIPE)
p.stdin.write("hello world\n") p.stdin.write("hello world\n")
p.stdin.close() p.stdin.close()
self.assertEquals(p.stderr.readlines(), ["hello world\n"]) self.assertEquals(p.stderr.readlines(), ["hello world\n"])
self.assertEquals(p.stdout, None) self.assertEquals(p.stdout, None)
self.assertEquals(p.wait(), 0) self.assertEquals(p.wait(), 0)
p = Popen(node, ['sh', '-c', 'cat >&2'], stdin = PIPE, stderr = PIPE) p = node.Popen(['sh', '-c', 'cat >&2'], stdin = sp.PIPE, stderr = sp.PIPE)
self.assertEquals(p.communicate(_longstring), (None, _longstring)) self.assertEquals(p.communicate(_longstring), (None, _longstring))
# #
p = Popen(node, ['sh', '-c', 'cat >&2'], p = node.Popen(['sh', '-c', 'cat >&2'],
stdin = PIPE, stdout = PIPE, stderr = STDOUT) stdin = sp.PIPE, stdout = sp.PIPE, stderr = sp.STDOUT)
p.stdin.write("hello world\n") p.stdin.write("hello world\n")
p.stdin.close() p.stdin.close()
self.assertEquals(p.stdout.readlines(), ["hello world\n"]) self.assertEquals(p.stdout.readlines(), ["hello world\n"])
self.assertEquals(p.stderr, None) self.assertEquals(p.stderr, None)
self.assertEquals(p.wait(), 0) self.assertEquals(p.wait(), 0)
p = Popen(node, ['sh', '-c', 'cat >&2'], p = node.Popen(['sh', '-c', 'cat >&2'],
stdin = PIPE, stdout = PIPE, stderr = STDOUT) stdin = sp.PIPE, stdout = sp.PIPE, stderr = sp.STDOUT)
self.assertEquals(p.communicate(_longstring), (_longstring, None)) self.assertEquals(p.communicate(_longstring), (_longstring, None))
# #
p = Popen(node, ['tee', '/dev/stderr'], p = node.Popen(['tee', '/dev/stderr'],
stdin = PIPE, stdout = PIPE, stderr = STDOUT) stdin = sp.PIPE, stdout = sp.PIPE, stderr = sp.STDOUT)
p.stdin.write("hello world\n") p.stdin.write("hello world\n")
p.stdin.close() p.stdin.close()
self.assertEquals(p.stdout.readlines(), ["hello world\n"] * 2) self.assertEquals(p.stdout.readlines(), ["hello world\n"] * 2)
self.assertEquals(p.stderr, None) self.assertEquals(p.stderr, None)
self.assertEquals(p.wait(), 0) self.assertEquals(p.wait(), 0)
p = Popen(node, ['tee', '/dev/stderr'], p = node.Popen(['tee', '/dev/stderr'],
stdin = PIPE, stdout = PIPE, stderr = STDOUT) stdin = sp.PIPE, stdout = sp.PIPE, stderr = sp.STDOUT)
self.assertEquals(p.communicate(_longstring[0:512]), self.assertEquals(p.communicate(_longstring[0:512]),
(_longstring[0:512] * 2, None)) (_longstring[0:512] * 2, None))
# #
p = Popen(node, ['tee', '/dev/stderr'], p = node.Popen(['tee', '/dev/stderr'],
stdin = PIPE, stdout = PIPE, stderr = PIPE) stdin = sp.PIPE, stdout = sp.PIPE, stderr = sp.PIPE)
p.stdin.write("hello world\n") p.stdin.write("hello world\n")
p.stdin.close() p.stdin.close()
self.assertEquals(p.stdout.readlines(), ["hello world\n"]) self.assertEquals(p.stdout.readlines(), ["hello world\n"])
self.assertEquals(p.stderr.readlines(), ["hello world\n"]) self.assertEquals(p.stderr.readlines(), ["hello world\n"])
self.assertEquals(p.wait(), 0) self.assertEquals(p.wait(), 0)
p = Popen(node, ['tee', '/dev/stderr'], p = node.Popen(['tee', '/dev/stderr'],
stdin = PIPE, stdout = PIPE, stderr = PIPE) stdin = sp.PIPE, stdout = sp.PIPE, stderr = sp.PIPE)
self.assertEquals(p.communicate(_longstring), (_longstring, ) * 2) self.assertEquals(p.communicate(_longstring), (_longstring, ) * 2)
def test_backticks(self): def test_backticks(self):
node = nemu.Node(nonetns = True) node = nemu.Node(nonetns = True)
self.assertEquals(backticks(node, "echo hello world"), "hello world\n") self.assertEquals(node.backticks("echo hello world"), "hello world\n")
self.assertEquals(backticks(node, r"echo hello\ \ world"), self.assertEquals(node.backticks(r"echo hello\ \ world"),
"hello world\n") "hello world\n")
self.assertEquals(backticks(node, ["echo", "hello", "world"]), self.assertEquals(node.backticks(["echo", "hello", "world"]),
"hello world\n") "hello world\n")
self.assertEquals(backticks(node, "echo hello world > /dev/null"), "") self.assertEquals(node.backticks("echo hello world > /dev/null"), "")
self.assertEquals(backticks_raise(node, "true"), "") self.assertEquals(node.backticks_raise("true"), "")
self.assertRaises(RuntimeError, backticks_raise, node, "false") self.assertRaises(RuntimeError, node.backticks_raise, "false")
self.assertRaises(RuntimeError, backticks_raise, node, "kill $$") self.assertRaises(RuntimeError, node.backticks_raise, "kill $$")
def test_system(self): def test_system(self):
node = nemu.Node(nonetns = True) node = nemu.Node(nonetns = True)
self.assertEquals(system(node, "true"), 0) self.assertEquals(node.system("true"), 0)
self.assertEquals(system(node, "false"), 1) self.assertEquals(node.system("false"), 1)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment