Commit ba17cfc6 authored by Brett Cannon's avatar Brett Cannon

Convert test_dummy_threading and test_dbm to unittest.

parent 5de250e8
#! /usr/bin/env python from test import test_support
"""Test script for the dbm module import unittest
Roger E. Masse
"""
import os import os
import random
import dbm import dbm
from dbm import error from dbm import error
from test.test_support import verbose, verify, TestSkipped, TESTFN
# make filename unique to allow multiple concurrent tests class DbmTestCase(unittest.TestCase):
# and to minimize the likelihood of a problem from an old file
filename = TESTFN def setUp(self):
self.filename = test_support.TESTFN
def cleanup(): self.d = dbm.open(self.filename, 'c')
for suffix in ['', '.pag', '.dir', '.db']: self.d.close()
try:
os.unlink(filename + suffix) def tearDown(self):
except OSError, (errno, strerror): for suffix in ['', '.pag', '.dir', '.db']:
# if we can't delete the file because of permissions, test_support.unlink(self.filename + suffix)
# nothing will work, so skip the test
if errno == 1: def test_keys(self):
raise TestSkipped, 'unable to remove: ' + filename + suffix self.d = dbm.open(self.filename, 'c')
self.assert_(self.d.keys() == [])
def test_keys(): self.d['a'] = 'b'
d = dbm.open(filename, 'c') self.d['12345678910'] = '019237410982340912840198242'
verify(d.keys() == []) self.d.keys()
d['a'] = 'b' self.assert_(self.d.has_key('a'))
d['12345678910'] = '019237410982340912840198242' self.d.close()
d.keys()
if d.has_key('a'): def test_modes(self):
if verbose: for mode in ['r', 'rw', 'w', 'n']:
print 'Test dbm keys: ', d.keys() try:
self.d = dbm.open(self.filename, mode)
d.close() self.d.close()
except dbm.error:
def test_modes(): self.fail()
d = dbm.open(filename, 'r')
d.close()
d = dbm.open(filename, 'rw')
d.close()
d = dbm.open(filename, 'w')
d.close()
d = dbm.open(filename, 'n')
d.close()
def test_main(): def test_main():
cleanup() test_support.run_unittest(DbmTestCase)
try:
test_keys()
test_modes()
except:
cleanup()
raise
cleanup()
if __name__ == '__main__': if __name__ == '__main__':
test_main() test_main()
# Very rudimentary test of threading module from test import test_support
import unittest
# Create a bunch of threads, let each do some work, wait until all are done
from test.test_support import verbose
import dummy_threading as _threading import dummy_threading as _threading
import time import time
class DummyThreadingTestCase(unittest.TestCase):
class TestThread(_threading.Thread):
class TestThread(_threading.Thread):
def run(self):
def run(self):
global running
global sema
global mutex
# Uncomment if testing another module, such as the real 'threading'
# module.
#delay = random.random() * 2
delay = 0
if test_support.verbose:
print 'task', self.getName(), 'will run for', delay, 'sec'
sema.acquire()
mutex.acquire()
running += 1
if test_support.verbose:
print running, 'tasks are running'
mutex.release()
time.sleep(delay)
if test_support.verbose:
print 'task', self.getName(), 'done'
mutex.acquire()
running -= 1
if test_support.verbose:
print self.getName(), 'is finished.', running, 'tasks are running'
mutex.release()
sema.release()
def setUp(self):
self.numtasks = 10
global sema
sema = _threading.BoundedSemaphore(value=3)
global mutex
mutex = _threading.RLock()
global running global running
# Uncomment if testing another module, such as the real 'threading' running = 0
# module. self.threads = []
#delay = random.random() * 2
delay = 0 def test_tasks(self):
if verbose: for i in range(self.numtasks):
print 'task', self.getName(), 'will run for', delay, 'sec' t = self.TestThread(name="<thread %d>"%i)
sema.acquire() self.threads.append(t)
mutex.acquire() t.start()
running = running + 1
if verbose: if test_support.verbose:
print running, 'tasks are running' print 'waiting for all tasks to complete'
mutex.release() for t in self.threads:
time.sleep(delay) t.join()
if verbose: if test_support.verbose:
print 'task', self.getName(), 'done' print 'all tasks done'
mutex.acquire()
running = running - 1
if verbose:
print self.getName(), 'is finished.', running, 'tasks are running'
mutex.release()
sema.release()
def starttasks():
for i in range(numtasks):
t = TestThread(name="<thread %d>"%i)
threads.append(t)
t.start()
def test_main(): def test_main():
# This takes about n/3 seconds to run (about n/3 clumps of tasks, times test_support.run_unittest(DummyThreadingTestCase)
# about 1 second per clump).
global numtasks
numtasks = 10
# no more than 3 of the 10 can run at once
global sema
sema = _threading.BoundedSemaphore(value=3)
global mutex
mutex = _threading.RLock()
global running
running = 0
global threads
threads = []
starttasks()
if verbose:
print 'waiting for all tasks to complete'
for t in threads:
t.join()
if verbose:
print 'all tasks done'
if __name__ == '__main__': if __name__ == '__main__':
test_main() test_main()
...@@ -75,6 +75,8 @@ Library ...@@ -75,6 +75,8 @@ Library
Tests Tests
----- -----
- GHOP 290: Convert test_dbm and test_dummy_threading to unittest.
- GHOP 293: Convert test_strftime, test_getargs, and test_pep247 to unittest. - GHOP 293: Convert test_strftime, test_getargs, and test_pep247 to unittest.
- Issue #2055: Convert test_fcntl to unittest. - Issue #2055: Convert test_fcntl to unittest.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment