Commit 4ce0c6d7 authored by Romain Courteaud's avatar Romain Courteaud

Test person access to slaptool.

parent 8defe678
......@@ -13,6 +13,7 @@ import xml.dom.ext.reader.Sax
import xml.dom.ext
import StringIO
import difflib
import transaction
class Simulator:
def __init__(self, outfile, method):
......@@ -32,7 +33,7 @@ class Simulator:
open(self.outfile, 'w').write(repr(l))
class TestSlapOSSlapToolMixin(testSlapOSMixin):
def afterSetUp(self):
def afterSetUp(self, person=None):
super(TestSlapOSSlapToolMixin, self).afterSetUp()
self.portal_slap = self.portal.portal_slap
new_id = self.generateNewId()
......@@ -44,6 +45,10 @@ class TestSlapOSSlapToolMixin(testSlapOSMixin):
title="Computer %s" % new_id,
reference="TESTCOMP-%s" % new_id
)
if (person is not None):
self.computer.edit(
source_administration_value=person,
)
self.computer.validate()
......@@ -1030,3 +1035,615 @@ class TestSlapOSSlapToolInstanceAccess(TestSlapOSSlapToolMixin):
if os.path.exists(self.instance_request_simulator):
os.unlink(self.instance_request_simulator)
class TestSlapOSSlapToolPersonAccess(TestSlapOSSlapToolMixin):
def afterSetUp(self):
self.login()
password = self.generateNewId()
reference = 'test_%s' % self.generateNewId()
person = self.portal.person_module.newContent(portal_type='Person',
title=reference,
reference=reference, password=password)
person.newContent(portal_type='Assignment', role='member').open()
transaction.commit()
person.recursiveImmediateReindexObject()
self.person = person
self.person_reference = person.getReference()
super(TestSlapOSSlapToolPersonAccess, self).afterSetUp(person=person)
def test_not_accessed_getComputerStatus(self):
self.login(self.person_reference)
created_at = rfc1123_date(DateTime())
response = self.portal_slap.getComputerStatus(self.computer_id)
self.assertEqual(200, response.status)
self.assertEqual('public, max-age=60, stale-if-error=604800',
response.headers.get('cache-control'))
self.assertEqual('REMOTE_USER',
response.headers.get('vary'))
self.assertTrue('last-modified' in response.headers)
self.assertEqual('text/xml; charset=utf-8',
response.headers.get('content-type'))
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<string>created_at</string>
<string>%(created_at)s</string>
<string>text</string>
<string>#error no data found for %(computer_id)s</string>
<string>user</string>
<string>SlapOS Master</string>
</dictionary>
</marshal>
""" % dict(
created_at=created_at,
computer_id=self.computer_id
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
def test_accessed_getComputerStatus(self):
self.login(self.computer_id)
self.portal_slap.getComputerInformation(self.computer_id)
self.login(self.person_reference)
created_at = rfc1123_date(DateTime())
response = self.portal_slap.getComputerStatus(self.computer_id)
self.assertEqual(200, response.status)
self.assertEqual('public, max-age=60, stale-if-error=604800',
response.headers.get('cache-control'))
self.assertEqual('REMOTE_USER',
response.headers.get('vary'))
self.assertTrue('last-modified' in response.headers)
self.assertEqual('text/xml; charset=utf-8',
response.headers.get('content-type'))
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<unicode>created_at</unicode>
<unicode>%(created_at)s</unicode>
<unicode>text</unicode>
<unicode>#access %(computer_id)s</unicode>
<unicode>user</unicode>
<unicode>%(computer_id)s</unicode>
</dictionary>
</marshal>
""" % dict(
created_at=created_at,
computer_id=self.computer_id
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
def assertComputerBangSimulator(self, args, kwargs):
stored = eval(open(self.computer_bang_simulator).read())
# do the same translation magic as in workflow
kwargs['comment'] = kwargs.pop('comment')
self.assertEqual(stored,
[{'recargs': args, 'reckwargs': kwargs,
'recmethod': 'reportComputerBang'}])
def test_computerBang(self):
self.login()
self.login(self.person_reference)
self.computer_bang_simulator = tempfile.mkstemp()[1]
try:
self.computer.reportComputerBang = Simulator(
self.computer_bang_simulator, 'reportComputerBang')
error_log = 'Please bang me'
response = self.portal_slap.computerBang(self.computer_id,
error_log)
self.assertEqual('None', response)
created_at = rfc1123_date(DateTime())
response = self.portal_slap.getComputerStatus(self.computer_id)
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<unicode>created_at</unicode>
<unicode>%(created_at)s</unicode>
<unicode>text</unicode>
<unicode>#error bang</unicode>
<unicode>user</unicode>
<unicode>%(person_reference)s</unicode>
</dictionary>
</marshal>
""" % dict(
created_at=created_at,
person_reference=self.person_reference,
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
self.assertComputerBangSimulator((), {'comment': error_log})
finally:
if os.path.exists(self.computer_bang_simulator):
os.unlink(self.computer_bang_simulator)
def test_getComputerPartitionStatus(self):
self.login()
self._makeComplexComputer()
self.login(self.person_reference)
partition_id = self.start_requested_software_instance.getAggregateValue(
portal_type='Computer Partition').getReference()
created_at = rfc1123_date(DateTime())
self.login(self.start_requested_software_instance.getReference())
response = self.portal_slap.getComputerPartitionStatus(self.computer_id,
partition_id)
self.assertEqual(200, response.status)
self.assertEqual( 'public, max-age=60, stale-if-error=604800',
response.headers.get('cache-control'))
self.assertEqual('REMOTE_USER',
response.headers.get('vary'))
self.assertTrue('last-modified' in response.headers)
self.assertEqual('text/xml; charset=utf-8',
response.headers.get('content-type'))
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<string>created_at</string>
<string>%(created_at)s</string>
<string>text</string>
<string>#error no data found for %(instance_guid)s</string>
<string>user</string>
<string>SlapOS Master</string>
</dictionary>
</marshal>
""" % dict(
created_at=created_at,
instance_guid=self.start_requested_software_instance.getReference(),
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
def test_getComputerPartitionStatus_visited(self):
self.login()
self._makeComplexComputer(person=self.person)
self.login(self.person_reference)
partition_id = self.start_requested_software_instance.getAggregateValue(
portal_type='Computer Partition').getReference()
created_at = rfc1123_date(DateTime())
self.login(self.start_requested_software_instance.getReference())
self.portal_slap.registerComputerPartition(self.computer_id, partition_id)
self.login(self.person_reference)
response = self.portal_slap.getComputerPartitionStatus(self.computer_id,
partition_id)
self.assertEqual(200, response.status)
self.assertEqual( 'public, max-age=60, stale-if-error=604800',
response.headers.get('cache-control'))
self.assertEqual('REMOTE_USER',
response.headers.get('vary'))
self.assertTrue('last-modified' in response.headers)
self.assertEqual('text/xml; charset=utf-8',
response.headers.get('content-type'))
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<unicode>created_at</unicode>
<unicode>%(created_at)s</unicode>
<unicode>text</unicode>
<unicode>#access %(computer_id)s %(partition_id)s</unicode>
<unicode>user</unicode>
<unicode>%(instance_guid)s</unicode>
</dictionary>
</marshal>
""" % dict(
created_at=created_at,
instance_guid=self.start_requested_software_instance.getReference(),
computer_id=self.computer_id,
partition_id=partition_id
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
def test_registerComputerPartition(self):
self.login()
self._makeComplexComputer(person=self.person)
partition_id = self.start_requested_software_instance.getAggregateValue(
portal_type='Computer Partition').getReference()
self.login(self.person_reference)
response = self.portal_slap.registerComputerPartition(self.computer_id, partition_id)
self.assertEqual(200, response.status)
self.assertEqual( 'public, max-age=1, stale-if-error=604800',
response.headers.get('cache-control'))
self.assertEqual('REMOTE_USER',
response.headers.get('vary'))
self.assertTrue('last-modified' in response.headers)
self.assertEqual('text/xml; charset=utf-8',
response.headers.get('content-type'))
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<object id='i2' module='slapos.slap.slap' class='ComputerPartition'>
<tuple>
<string>%(computer_id)s</string>
<string>partition1</string>
</tuple>
<dictionary id='i3'>
<string>_computer_id</string>
<string>%(computer_id)s</string>
<string>_connection_dict</string>
<dictionary id='i4'/>
<string>_instance_guid</string>
<string>%(instance_guid)s</string>
<string>_need_modification</string>
<int>1</int>
<string>_parameter_dict</string>
<dictionary id='i5'>
<string>ip_list</string>
<list id='i6'>
<tuple>
<string/>
<string>ip_address_1</string>
</tuple>
</list>
<string>param</string>
<string>%(param)s</string>
<string>slap_computer_id</string>
<string>%(computer_id)s</string>
<string>slap_computer_partition_id</string>
<string>partition1</string>
<string>slap_software_release_url</string>
<string>%(software_release_url)s</string>
<string>slap_software_type</string>
<string>%(software_type)s</string>
<string>slave_instance_list</string>
<list id='i7'/>
<string>timestamp</string>
<string>%(timestamp)s</string>
</dictionary>
<string>_partition_id</string>
<string>partition1</string>
<string>_request_dict</string>
<none/>
<string>_requested_state</string>
<string>started</string>
<string>_software_release_document</string>
<object id='i8' module='slapos.slap.slap' class='SoftwareRelease'>
<tuple>
<string>%(software_release_url)s</string>
<string>%(computer_id)s</string>
</tuple>
<dictionary id='i9'>
<string>_computer_guid</string>
<string>%(computer_id)s</string>
<string>_software_instance_list</string>
<list id='i10'/>
<string>_software_release</string>
<string>%(software_release_url)s</string>
</dictionary>
</object>
<string>_synced</string>
<bool>1</bool>
</dictionary>
</object>
</marshal>
""" % dict(
computer_id=self.computer_id,
param=self.start_requested_software_instance.getInstanceXmlAsDict()['param'],
software_release_url=self.start_requested_software_instance.getUrlString(),
timestamp=int(self.start_requested_software_instance.getModificationDate()),
instance_guid=self.start_requested_software_instance.getReference(),
software_type=self.start_requested_software_instance.getSourceReference()
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
def assertInstanceBangSimulator(self, args, kwargs):
stored = eval(open(self.instance_bang_simulator).read())
# do the same translation magic as in workflow
kwargs['comment'] = kwargs.pop('comment')
self.assertEqual(stored,
[{'recargs': args, 'reckwargs': kwargs,
'recmethod': 'bang'}])
def test_softwareInstanceBang(self):
self.login()
self._makeComplexComputer(person=self.person)
self.instance_bang_simulator = tempfile.mkstemp()[1]
try:
partition_id = self.start_requested_software_instance.getAggregateValue(
portal_type='Computer Partition').getReference()
self.login(self.person_reference)
self.start_requested_software_instance.bang = Simulator(
self.instance_bang_simulator, 'bang')
error_log = 'Please bang me'
response = self.portal_slap.softwareInstanceBang(self.computer_id,
partition_id, error_log)
self.assertEqual('None', response)
created_at = rfc1123_date(DateTime())
response = self.portal_slap.getComputerPartitionStatus(self.computer_id,
partition_id)
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<unicode>created_at</unicode>
<unicode>%(created_at)s</unicode>
<unicode>text</unicode>
<unicode>#error bang called</unicode>
<unicode>user</unicode>
<unicode>%(person_reference)s</unicode>
</dictionary>
</marshal>
""" % dict(
created_at=created_at,
person_reference=self.person_reference,
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
self.assertInstanceBangSimulator((), {'comment': error_log, 'bang_tree': True})
finally:
if os.path.exists(self.instance_bang_simulator):
os.unlink(self.instance_bang_simulator)
def assertInstanceRenameSimulator(self, args, kwargs):
stored = eval(open(self.instance_rename_simulator).read())
# do the same translation magic as in workflow
self.assertEqual(stored,
[{'recargs': args, 'reckwargs': kwargs,
'recmethod': 'rename'}])
def test_softwareInstanceRename(self):
self.login()
self._makeComplexComputer(person=self.person)
self.instance_rename_simulator = tempfile.mkstemp()[1]
try:
partition_id = self.start_requested_software_instance.getAggregateValue(
portal_type='Computer Partition').getReference()
self.login(self.person_reference)
self.start_requested_software_instance.rename = Simulator(
self.instance_rename_simulator, 'rename')
new_name = 'new me'
response = self.portal_slap.softwareInstanceRename(new_name, self.computer_id,
partition_id)
self.assertEqual('None', response)
self.assertInstanceRenameSimulator((), {
'comment': 'Rename %s into %s' % (self.start_requested_software_instance.getTitle(),
new_name), 'new_name': new_name})
finally:
if os.path.exists(self.instance_rename_simulator):
os.unlink(self.instance_rename_simulator)
def assertInstanceRequestSimulator(self, args, kwargs):
stored = eval(open(self.instance_request_simulator).read())
# do the same translation magic as in workflow
self.assertEqual(stored,
[{'recargs': args, 'reckwargs': kwargs,
'recmethod': 'requestSoftwareInstance'}])
def test_request(self):
self.login()
self.instance_request_simulator = tempfile.mkstemp()[1]
try:
self.login(self.person_reference)
self.person.requestSoftwareInstance = Simulator(
self.instance_request_simulator, 'requestSoftwareInstance')
response = self.portal_slap.requestComputerPartition(
software_release='req_release',
software_type='req_type',
partition_reference='req_reference',
partition_parameter_xml='<marshal><dictionary id="i2"/></marshal>',
filter_xml='<marshal><dictionary id="i2"/></marshal>',
state='<marshal><string>started</string></marshal>',
shared_xml='<marshal><bool>0</bool></marshal>',
)
self.assertEqual(408, response.status)
self.assertEqual('private',
response.headers.get('cache-control'))
self.assertInstanceRequestSimulator((), {
'instance_xml': "<?xml version='1.0' encoding='utf-8'?>\n<instance/>\n",
'software_title': 'req_reference',
'software_release': 'req_release',
'state': 'started',
'sla_xml': "<?xml version='1.0' encoding='utf-8'?>\n<instance/>\n",
'software_type': 'req_type',
'shared': False})
finally:
if os.path.exists(self.instance_request_simulator):
os.unlink(self.instance_request_simulator)
def assertSupplySimulator(self, args, kwargs):
stored = eval(open(self.computer_supply_simulator).read())
# do the same translation magic as in workflow
kwargs['software_release_url'] = kwargs.pop('software_release_url')
kwargs['state'] = kwargs.pop('state')
self.assertEqual(stored,
[{'recargs': args, 'reckwargs': kwargs,
'recmethod': 'requestSoftwareRelease'}])
def test_computerSupply(self):
self.login()
self.computer_supply_simulator = tempfile.mkstemp()[1]
try:
self.login(self.person_reference)
self.computer.requestSoftwareRelease = Simulator(
self.computer_supply_simulator, 'requestSoftwareRelease')
software_url = 'live_test_url_%i' % self.generateNewId()
response = self.portal_slap.supplySupply(
software_url,
self.computer_id,
state='destroyed')
self.assertEqual('None', response)
self.assertSupplySimulator((), {
'software_release_url': software_url,
'state': 'destroyed'})
finally:
if os.path.exists(self.computer_supply_simulator):
os.unlink(self.computer_supply_simulator)
def assertRequestComputerSimulator(self, args, kwargs):
stored = eval(open(self.computer_request_computer_simulator).read())
# do the same translation magic as in workflow
kwargs['computer_title'] = kwargs.pop('computer_title')
self.assertEqual(stored,
[{'recargs': args, 'reckwargs': kwargs,
'recmethod': 'requestComputer'}])
def test_requestComputer(self):
self.login()
self.computer_request_computer_simulator = tempfile.mkstemp()[1]
try:
self.login(self.person_reference)
self.person.requestComputer = Simulator(
self.computer_request_computer_simulator, 'requestComputer')
computer_id = 'Foo Computer'
computer_reference = 'live_comp_%i' % self.generateNewId()
self.portal.REQUEST.set('computer_reference', computer_reference)
response = self.portal_slap.requestComputer(computer_id)
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<object id='i2' module='slapos.slap.slap' class='Computer'>
<tuple>
<string>%(computer_id)s</string>
</tuple>
<dictionary id='i3'>
<string>_computer_id</string>
<string>%(computer_id)s</string>
</dictionary>
</object>
</marshal>
""" % {'computer_id': computer_reference}
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
self.assertRequestComputerSimulator((), {'computer_title': computer_id})
finally:
if os.path.exists(self.computer_request_computer_simulator):
os.unlink(self.computer_request_computer_simulator)
def assertGenerateComputerCertificateSimulator(self, args, kwargs):
stored = eval(open(self.generate_computer_certificate_simulator).read())
# do the same translation magic as in workflow
self.assertEqual(stored,
[{'recargs': args, 'reckwargs': kwargs,
'recmethod': 'generateComputerCertificate'}])
def test_generateComputerCertificate(self):
self.login()
self.generate_computer_certificate_simulator = tempfile.mkstemp()[1]
try:
self.login(self.person_reference)
self.computer.generateCertificate = Simulator(
self.generate_computer_certificate_simulator,
'generateComputerCertificate')
computer_certificate = 'live_\ncertificate_%i' % self.generateNewId()
computer_key = 'live_\nkey_%i' % self.generateNewId()
self.portal.REQUEST.set('computer_certificate', computer_certificate)
self.portal.REQUEST.set('computer_key', computer_key)
response = self.portal_slap.generateComputerCertificate(self.computer_id)
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<string>certificate</string>
<string>%(computer_certificate)s</string>
<string>key</string>
<string>%(computer_key)s</string>
</dictionary>
</marshal>
""" % {'computer_key': computer_key, 'computer_certificate': computer_certificate}
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
self.assertGenerateComputerCertificateSimulator((), {})
finally:
if os.path.exists(self.generate_computer_certificate_simulator):
os.unlink(self.generate_computer_certificate_simulator)
def assertRevokeComputerCertificateSimulator(self, args, kwargs):
stored = eval(open(self.revoke_computer_certificate_simulator).read())
# do the same translation magic as in workflow
self.assertEqual(stored,
[{'recargs': args, 'reckwargs': kwargs,
'recmethod': 'revokeComputerCertificate'}])
def test_revokeComputerCertificate(self):
self.login()
self.revoke_computer_certificate_simulator = tempfile.mkstemp()[1]
try:
self.login(self.person_reference)
self.computer.revokeCertificate = Simulator(
self.revoke_computer_certificate_simulator,
'revokeComputerCertificate')
response = self.portal_slap.revokeComputerCertificate(self.computer_id)
self.assertEqual('None', response)
self.assertRevokeComputerCertificateSimulator((), {})
finally:
if os.path.exists(self.revoke_computer_certificate_simulator):
os.unlink(self.revoke_computer_certificate_simulator)
10
\ No newline at end of file
11
\ No newline at end of file
......@@ -240,7 +240,7 @@ class testSlapOSMixin(Products.Vifib.tests.VifibMixin.testVifibMixin):
self.partition.validate()
self.tic()
def _makeComplexComputer(self):
def _makeComplexComputer(self, person=None):
for i in range(1, 5):
id_ = 'partition%s' % i
p = self.computer.newContent(portal_type='Computer Partition',
......@@ -301,6 +301,7 @@ class testSlapOSMixin(Products.Vifib.tests.VifibMixin.testVifibMixin):
hosting_subscription.edit(
title=self.generateNewSoftwareTitle(),
reference="TESTSI-%s" % self.generateNewId(),
destination_section_value=person,
)
kw = dict(
software_release=\
......@@ -326,6 +327,7 @@ class testSlapOSMixin(Products.Vifib.tests.VifibMixin.testVifibMixin):
hosting_subscription.edit(
title=self.generateNewSoftwareTitle(),
reference="TESTSI-%s" % self.generateNewId(),
destination_section_value=person,
)
kw = dict(
software_release=\
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment