Commit a422e9b9 authored by Denis Bilenko's avatar Denis Bilenko

util: remove old scripts related to virtualbox testing

parent 132bf590
import sys
import os
import re
import time
import datetime
from functools import wraps
from subprocess import Popen
def remote_wrapper(function):
@wraps(function)
def wrapper(*args, **kwargs):
print '%s: STARTED.' % datetime.datetime.now()
result = function(*args, **kwargs)
print '%s: DONE.' % datetime.datetime.now()
return result
return wrapper
def get_output(command):
# XXX use Popen
result = os.popen(command)
output = result.read()
exitcode = result.close()
if exitcode:
sys.stdout.write(output)
raise SystemExit('Command %r failed with code %r' % (command, exitcode))
return output
info_re = ('(^|\n)Name:\s*(?P<name>[^\n]+)'
'(\nGuest OS:\s*(?P<os>[^\n]+))?'
'\nUUID:\\s*(?P<id>[^\n]+).*?')
info_re = re.compile(info_re, re.DOTALL)
description_re = re.compile('^Description:(?P<desc>.*?\n.*?\n)', re.M)
state_re = re.compile('^State:\s*(.*?)$', re.M)
def get_machines(filter_inaccessible=True):
output = get_output('VBoxManage list -l vms 2> /dev/null')
results = []
for m in info_re.finditer(output):
info = m.groupdict()
info['start'] = m.end(0)
if results:
results[-1]['end'] = m.start(0)
results.append(info)
for result in results:
text = output[result.pop('start', 0):result.pop('end', None)]
d = description_re.findall(text)
if d:
assert len(d) == 1, (result, d)
result['desc'] = d[0].strip()
if filter_inaccessible:
results = [m for m in results if m.get('name') and m.get('name') != '<inaccessible!>']
return results
def get_machine(name):
for machine in get_machines():
if machine['name'] == name:
return machine
def vbox_get_state(name):
if not isinstance(name, basestring):
raise TypeError('Expected string: %r' % (name, ))
output = get_output('VBoxManage showvminfo %s' % name)
state = state_re.findall(output)
assert len(state) == 1, state
return state[0].split('(')[0].replace(' ', '')
def get_default_machine(desc=None, os='windows'):
machines = get_machines()
if os:
machines = [m for m in machines if os in m.get('os', '').lower()]
if len(machines) == 1:
return machines[0]
if desc:
machines = [m for m in machines if desc in m.get('desc', '').lower()]
if len(machines) == 1:
return machines[0]
if not machines:
sys.exit('Could not find an appropriate VirtualBox VM. Pass --machine NAME.')
if machines:
sys.exit('More than one machine matches. Pass --machine NAME.')
def system(command, fail=True):
noisy = system.noisy
if noisy:
print 'Running %r' % command
if isinstance(command, basestring):
args = command.split()
else:
args = command
result = Popen(args).wait()
if result:
msg = 'Command %r failed with code %r' % (command, result)
if fail:
sys.exit(msg)
elif noisy:
sys.stderr.write(msg + '\n')
return result
if noisy:
msg = 'Command %r succeeded' % command
print msg
print '-' * min(78, len(msg))
return result
system.noisy = False
def unlink(path):
try:
os.unlink(path)
except OSError, ex:
if ex.errno == 2: # No such file or directory
return
raise
class VirtualBox(object):
def __init__(self, name, username, password='', type=None):
self.name = name
self.username = username
self.password = password
self.type = type
def start(self):
self.initial_state = start(self.name, self.type)
return self.initial_state
def stop(self):
if self.initial_state == 'paused':
self.pause()
elif self.initial_state == 'saved':
self.restore()
elif self.initial_state != 'running':
self.poweroff()
def pause(self):
return vbox_pause(self.name)
def poweroff(self):
return vbox_poweroff(self.name)
def restore(self):
return vbox_restore(self.name)
def mkdir(self, path, **kwargs):
return vbox_mkdir(self.name, path, username=self.username, password=self.password, **kwargs)
def copyto(self, source, dest):
return vbox_copyto(self.name, source, dest, username=self.username, password=self.password)
def copyfrom(self, source, dest):
return vbox_copyfrom(self.name, source, dest, username=self.username, password=self.password)
def execute(self, exe, arguments):
return vbox_execute(self.name, exe, arguments, username=self.username, password=self.password)
def start(name, type=None):
state = vbox_get_state(name)
if state == 'running':
pass
elif state == 'paused':
vbox_resume(name)
elif state in ('saved', 'poweredoff'):
vbox_startvm(name, type)
else:
print 'Weird state: %r' % state
vbox_poweroff(name, fail=False)
vbox_startvm(name, type)
return state
def vbox_startvm(name, type=None):
if type:
options = ' --type ' + type
else:
options = ''
system('VBoxManage startvm %s%s' % (name, options))
def vbox_resume(name):
system('VBoxManage controlvm %s resume' % name)
def vbox_pause(name):
system('VBoxManage controlvm %s pause' % name)
def vbox_poweroff(name, **kwargs):
system('VBoxManage controlvm %s poweroff' % name, **kwargs)
def vbox_restorecurrent(name):
system('VBoxManage snapshot %s restorecurrent' % name)
def vbox_restore(name):
state = vbox_get_state(name)
if state == 'saved':
return
if state == 'running':
vbox_poweroff(name)
time.sleep(0.5)
vbox_restorecurrent(name)
def _get_options(username=None, password=None, image=None):
from pipes import quote
options = ''
if username:
options += ' --username %s' % quote(username)
if password:
options += ' --password %s' % quote(password)
if image:
options += ' --image %s' % quote(image)
return options
def _vbox_mkdir(name, path, username=None, password=None):
from pipes import quote
system('VBoxManage guestcontrol %s mkdir %s%s' % (name, quote(path), _get_options(username, password)))
def vbox_mkdir(name, path, username=None, password=None, timeout=90):
end = time.time() + timeout
while True:
try:
return _vbox_mkdir(name, path, username=username, password=password)
except SystemExit:
if time.time() > end:
raise
time.sleep(5)
def vbox_copyto(name, source, dest, username=None, password=None):
from pipes import quote
args = (name, quote(os.path.abspath(source)), quote(dest), _get_options(username, password))
system('VBoxManage guestcontrol %s copyto %s %s%s' % args)
def vbox_copyfrom(name, source, dest, username=None, password=None):
from pipes import quote
args = (name, quote(source), quote(os.path.abspath(dest)), _get_options(username, password))
system('VBoxManage guestcontrol %s copyfrom %s %s%s' % args)
def vbox_execute(name, image, arguments=None, username=None, password=None):
from pipes import quote
options = _get_options(username, password, image)
options += ' --wait-stdout --wait-stderr'
try:
command = 'VBoxManage guestcontrol %s execute %s' % (name, options)
if arguments:
command += ' -- %s' % ' '.join(quote(x) for x in arguments)
system(command)
except SystemExit, ex:
sys.stderr.write(str(ex) + '\n')
if __name__ == '__main__':
command = sys.argv[1]
command = globals()['vbox_' + command]
assert callable(command), command
command(*sys.argv[2:])
#!/usr/bin/python
# Copyright (C) 2012 Denis Bilenko. See LICENSE for details.
"""
A script to build/test/make dist for gevent in a Windows Virtual Box machine.
Build the current working directory:
winvbox.py build
Build and run testrunner:
winvbox.py test [-- testrunner options]
Make binary installers:
winvbox.py dist VERSION
Other useful options:
--source FILENAME # release tarball of gevent to use
--python PYTHONVER # Python version to use. Defaults to "27".
Could also be a path to Python executable.
--machine MACHINE # VirtualBox machine to use.
--username LOGIN # Account name to use for this machine. Defaults to current user name.
--password PASSWORD # Password for the account. Defaults to empty string.
winvbox.py assumes Python is located at "C:/Python<VER>/python.exe".
Instead of using --machine option, you can also add word "gevent" to the machine description in
order to be selected by this script.
"""
import sys
import os
import datetime
import glob
from functools import wraps
def main():
import optparse
import uuid
import virtualbox
parser = optparse.OptionParser()
parser.add_option('--source')
parser.add_option('--fast', action='store_true')
parser.add_option('--revert', action='store_true')
parser.add_option('--clean', action='store_true')
parser.add_option('--python', default='27')
parser.add_option('--machine')
parser.add_option('--username')
parser.add_option('--password', default='')
parser.add_option('--version', default='dev')
parser.add_option('-v', '--verbose', action='store_true')
parser.add_option('--type', default='headless')
options, args = parser.parse_args()
system.noisy = options.verbose
if not args or args[0] not in ['build', 'test', 'dist', 'noop']:
sys.exit('Expected a command: build|test|dist')
command = args[0]
command_args = args[1:]
if options.username is None:
import getpass
options.username = getpass.getuser()
if not options.source:
import makedist
options.source = makedist.makedist('dev', fast=True)
options.source = os.path.abspath(options.source)
options.unique = uuid.uuid4().hex
directory = 'c:/tmpdir.%s' % options.unique
python = options.python
if not python.endswith('exe') and '/' not in python:
python = 'C:/Python%s/python.exe' % python
if not options.machine:
options.machine = get_default_machine()
print 'Using directory %r on machine %r. Python: %s' % (directory, options.machine, python)
this_script = __file__
if this_script.lower().endswith('.pyc'):
this_script = this_script[:-1]
this_script_remote = '%s/%s' % (directory, os.path.basename(this_script))
machine = virtualbox.VirtualBox(options.machine, options.username, options.password, type=options.type)
machine.start()
try:
machine.mkdir(directory)
machine.copyto(options.source, directory + '/' + os.path.basename(options.source))
machine.copyto(this_script, this_script_remote)
machine.script_path = this_script_remote
machine.python_path = python
machine.directory = directory
function = globals().get('command_%s' % command, command_default)
function(command, command_args, machine, options)
finally:
machine.stop()
def run_command(machine, command, command_args):
args = ['-u', machine.script_path, 'REMOTE', command] + (command_args or [])
machine.execute(machine.python_path, args)
def command_default(command, command_args, machine, options):
run_command(machine, command, command_args)
def remote_wrapper(function):
@wraps(function)
def wrapper(*args, **kwargs):
print '%s: STARTED.' % datetime.datetime.now()
result = function(*args, **kwargs)
print '%s: DONE.' % datetime.datetime.now()
return result
return wrapper
@remote_wrapper
def remote_noop(args):
print 'Will not do anything.'
@remote_wrapper
def remote_build(args):
extract_and_build()
def command_test(command, command_args, machine, options):
run_command(machine, command, command_args)
local_filename = 'remote-tmp-testrunner-%s.sqlite3' % machine.name
machine.copyfrom(machine.directory + '/tmp-testrunner.sqlite3', local_filename)
system('%s util/runteststat.py %s' % (sys.executable, local_filename))
@remote_wrapper
def remote_test(args):
extract_and_build()
os.chdir('greentest')
os.environ['PYTHONPATH'] = '.;..'
system('%s testrunner.py %s' % (sys.executable, ' '.join(args)), fail=False)
system('mv tmp-testrunner.sqlite3 ../../')
def command_dist(command, command_args, machine, options):
run_command(machine, command, command_args)
local_name = 'dist_%s.tar' % options.unique
machine.copyfrom(machine.directory + '/dist.tar', local_name)
if os.path.exists(local_name):
system('tar -xvf %s' % local_name)
else:
sys.exit('Failed to receive %s' % local_name)
@remote_wrapper
def remote_dist(args):
extract_and_build()
success = 0
if not system('%s setup.py bdist_egg' % sys.executable, fail=False):
success += 1
if not system('%s setup.py bdist_wininst' % sys.executable, fail=False):
success += 1
# bdist_msi fails if version is not strict
if not system('%s setup.py bdist_msi' % sys.executable, fail=False):
success += 1
if not success:
sys.exit('bdist_egg bdist_wininst and bdist_msi all failed')
# must use forward slash here. back slashe causes dist.tar to be placed on c:\
system('tar -cf ../dist.tar dist')
def extract_and_build():
import tarfile
filename = glob.glob('gevent*.tar.gz')
assert len(filename) == 1, filename
filename = filename[0]
directory = filename[:-7]
print 'Extracting %s to %s' % (filename, os.getcwd())
tarfile.open(filename).extractall()
print 'cd into %s' % directory
os.chdir(directory)
system('%s setup.py build' % sys.executable)
def get_default_machine():
return _get_default_machine()['name']
def _get_default_machine():
import virtualbox
machines = virtualbox.get_machines()
if len(machines) == 1:
return machines[0]
machines = [m for m in machines if 'windows' in m.get('os', '').lower()]
if len(machines) == 1:
return machines[0]
machines = [m for m in machines if 'gevent' in m.get('desc', '').lower()]
if len(machines) == 1:
return machines[0]
if not machines:
sys.exit('Could not find an appropriate VirtualBox VM. Pass --machine NAME.')
if machines:
sys.exit('More than one machine matches "windows" and has "gevent" in description. Pass --machine NAME.')
def system(command, fail=True):
noisy = system.noisy
if noisy:
print 'Running %r' % command
result = os.system(command)
if result:
msg = 'Command %r failed with code %r' % (command, result)
if fail:
sys.exit(msg)
elif noisy:
sys.stderr.write(msg + '\n')
return result
if noisy:
msg = 'Command %r succeeded' % command
print msg
print '-' * min(78, len(msg))
return result
system.noisy = True
if __name__ == '__main__':
if sys.argv[1:2] == ['REMOTE']:
command = sys.argv[2]
args = sys.argv[3:]
function = globals()['remote_' + command]
os.chdir(os.path.dirname(__file__))
function(args)
else:
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment