pax_global_header 0000666 0000000 0000000 00000000064 13221246610 0014507 g ustar 00root root 0000000 0000000 52 comment=3ffed138165cd7a37033c0fabf812e70c8d9d588
slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/ 0000775 0000000 0000000 00000000000 13221246610 0024000 5 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/ 0000775 0000000 0000000 00000000000 13221246610 0025273 5 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/ 0000775 0000000 0000000 00000000000 13221246610 0026753 5 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/ 0000775 0000000 0000000 00000000000 13221246610 0030114 5 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/Document/ 0000775 0000000 0000000 00000000000 13221246610 0031672 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000000000 13221246610 0033712 0 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/Document Permissions.py 0000664 0000000 0000000 00000002470 13221246610 0032725 0 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS # -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Vifib SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
SlapOSMachineAuthenticationPlugin.py 0000664 0000000 0000000 00000007311 13221246610 0037116 0 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS # -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2010 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5Type.Globals import InitializeClass
from AccessControl import ClassSecurityInfo
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.interfaces import plugins
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.PluggableAuthService import DumbHTTPExtractor
#Form for new plugin in ZMI
manage_addSlapOSMachineAuthenticationPluginForm = PageTemplateFile(
'www/SlapOS_addSlapOSMachineAuthenticationPlugin', globals(),
__name__='manage_addSlapOSMachineAuthenticationPluginForm')
def addSlapOSMachineAuthenticationPlugin(dispatcher, id, title=None, REQUEST=None):
""" Add a SlapOSMachineAuthenticationPlugin to a Pluggable Auth Service. """
plugin = SlapOSMachineAuthenticationPlugin(id, title)
dispatcher._setObject(plugin.getId(), plugin)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'SlapOSMachineAuthenticationPlugin+added.'
% dispatcher.absolute_url())
class SlapOSMachineAuthenticationPlugin(BasePlugin):
"""
Plugin to authenicate as machines.
"""
meta_type = "SlapOS Machine Authentication Plugin"
security = ClassSecurityInfo()
def __init__(self, id, title=None):
#Register value
self._id = self.id = id
self.title = title
security.declarePrivate('extractCredentials')
def extractCredentials(self, request):
""" Extract credentials from the request header. """
creds = {}
getHeader = getattr(request, 'getHeader', None)
if getHeader is None:
# use get_header instead for Zope-2.8
getHeader = request.get_header
user_id = getHeader('REMOTE_USER')
if user_id is not None:
creds['external_login'] = user_id
creds['remote_host'] = request.get('REMOTE_HOST', '')
creds['login_portal_type'] = "ERP5 Login"
try:
creds['remote_address'] = request.getClientAddr()
except AttributeError:
creds['remote_address'] = request.get('REMOTE_ADDR', '')
return creds
else:
# fallback to default way
return DumbHTTPExtractor().extractCredentials(request)
classImplements( SlapOSMachineAuthenticationPlugin,
plugins.ILoginPasswordHostExtractionPlugin)
InitializeClass(SlapOSMachineAuthenticationPlugin)
SlapOSShadowAuthenticationPlugin.py 0000664 0000000 0000000 00000022212 13221246610 0036774 0 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS # -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Vifib SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from functools import partial
from Products.ERP5Type.Globals import InitializeClass
from AccessControl import ClassSecurityInfo
from Products import ERP5Security
from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.interfaces import plugins
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.ERP5Security.ERP5GroupManager import NO_CACHE_MODE
from Products.ERP5Type.Cache import CachingMethod
from Products.ERP5Security.ERP5LoginUserManager import SYSTEM_USER_USER_NAME,\
SPECIAL_USER_NAME_SET
# some usefull globals
LOGIN_PREFIX = 'SHADOW-'
LOGIN_PREFIX_LENGTH = len(LOGIN_PREFIX)
#Form for new plugin in ZMI
manage_addSlapOSShadowAuthenticationPluginForm = PageTemplateFile(
'www/SlapOS_addSlapOSShadowAuthenticationPlugin', globals(),
__name__='manage_addSlapOSShadowAuthenticationPluginForm')
def addSlapOSShadowAuthenticationPlugin(dispatcher, id, title=None, REQUEST=None):
""" Add a SlapOSShadowAuthenticationPlugin to a Pluggable Auth Service. """
plugin = SlapOSShadowAuthenticationPlugin(id, title)
dispatcher._setObject(plugin.getId(), plugin)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'SlapOSShadowAuthenticationPlugin+added.'
% dispatcher.absolute_url())
class SlapOSShadowAuthenticationPlugin(BasePlugin):
"""
Plugin to authenicate as shadows.
"""
meta_type = "SlapOS Shadow Authentication Plugin"
security = ClassSecurityInfo()
def __init__(self, id, title=None):
#Register value
self._setId(id)
self.title = title
#################################
# IGroupsPlugin #
#################################
# This is patched version of
# Products.ERP5Security.ERP5GroupManager.ERP5GroupManager.getGroupsForPrincipal
# which allows to treat Computer and Software Instance as loggable user
def getGroupsForPrincipal(self, principal, request=None):
""" See IGroupsPlugin.
"""
# If this is the super user, skip the check.
if principal.getId() == ERP5Security.SUPER_USER:
return ()
@UnrestrictedMethod
def _getGroupsForPrincipal(user_id, path):
if user_id.startswith(LOGIN_PREFIX):
user_id = user_id[LOGIN_PREFIX_LENGTH:]
else:
return ( )
# get the person from its login - no security check needed
user_path_set = {
x['path']
for x in self.searchUsers(id=user_id, exact_match=True)
if 'path' in x
}
if not user_path_set:
return ()
user_path, = user_path_set
person_object = self.getPortalObject().unrestrictedTraverse(user_path)
portal_type = person_object.getPortalType()
return (
'R-SHADOW-%s' % portal_type.replace(' ', '').upper(), # generic group
'SHADOW-%s' % user_id # user specific shadow
)
if not NO_CACHE_MODE:
_getGroupsForPrincipal = CachingMethod(_getGroupsForPrincipal,
id='ERP5GroupManager_getGroupsForPrincipal',
cache_factory='erp5_content_short')
return _getGroupsForPrincipal(
user_id=principal.getId(),
path=self.getPhysicalPath())
#
# IUserEnumerationPlugin implementation
#
security.declarePrivate('enumerateUsers')
def enumerateUsers(self, id=None, login=None, exact_match=False,
sort_by=None, max_results=None, login_portal_type=None, **kw):
""" See IUserEnumerationPlugin.
"""
portal = self.getPortalObject()
if login_portal_type is None:
login_portal_type = portal.getPortalLoginTypeList()
unrestrictedSearchResults = portal.portal_catalog.unrestrictedSearchResults
searchUser = lambda **kw: unrestrictedSearchResults(
select_list=('user_id', ),
**kw
).dictionaries()
searchLogin = lambda **kw: unrestrictedSearchResults(
select_list=('parent_uid', 'reference'),
validation_state='validated',
**kw
).dictionaries()
if login_portal_type is not None:
searchLogin = partial(searchLogin, portal_type=login_portal_type)
special_user_name_set = set()
if login is None:
# Only search by id if login is not given. Same logic as in
# PluggableAuthService.searchUsers.
# CUSTOM: Modify the id to remove the prefix before search the User
if id.startswith(LOGIN_PREFIX):
id = id[LOGIN_PREFIX_LENGTH:]
else:
return ( )
# END OF CUSTOM CODE
if isinstance(id, str):
id = (id, )
# Short-cut "System Processes" as not being searchable by user_id.
# This improves performance in proxy-role'd execution by avoiding an
# sql query expected to find no user.
id = [x for x in id if x != SYSTEM_USER_USER_NAME]
if id:
if exact_match:
requested = set(id).__contains__
else:
requested = lambda x: True
user_list = [
x for x in searchUser(
user_id={
'query': id,
'key': 'ExactMatch' if exact_match else 'Keyword',
},
limit=max_results,
)
if requested(x['user_id'])
]
else:
user_list = []
login_dict = {}
if user_list:
for login in searchLogin(parent_uid=[x['uid'] for x in user_list]):
login_dict.setdefault(login['parent_uid'], []).append(login)
else:
# CUSTOM: Modify the login to remove the prefix before search the User
if login.startswith(LOGIN_PREFIX):
login = login[LOGIN_PREFIX_LENGTH:]
else:
return ( )
# END OF CUSTOM CODE
if isinstance(login, str):
login = (login, )
login_list = []
for user_login in login:
if user_login in SPECIAL_USER_NAME_SET:
special_user_name_set.add(user_login)
else:
login_list.append(user_login)
login_dict = {}
if exact_match:
requested = set(login_list).__contains__
else:
requested = lambda x: True
if login_list:
for login in searchLogin(
reference={
'query': login_list,
'key': 'ExactMatch' if exact_match else 'Keyword',
},
limit=max_results,
):
if requested(login['reference']):
login_dict.setdefault(login['parent_uid'], []).append(login)
if login_dict:
user_list = searchUser(uid=list(login_dict))
else:
user_list = []
plugin_id = self.getId()
# CUSTOM: In the block below the LOGIN_PREFIX is added before id and login
# to keep compatibility.
result = [
{
'id': LOGIN_PREFIX + user['user_id'],
# Note: PAS forbids us from returning more than one entry per given id,
# so take any available login.
'login': LOGIN_PREFIX + login_dict.get(user['uid'], [{'reference': None}])[0]['reference'],
'pluginid': plugin_id,
# Extra properties, specific to ERP5
'path': user['path'],
'uid': user['uid'],
'login_list': [
{
'reference': LOGIN_PREFIX + login['reference'],
'path': login['path'],
'uid': login['uid'],
}
for login in login_dict.get(user['uid'], [])
],
}
for user in user_list if user['user_id']
]
# END OF CUSTOM CODE
return tuple(result)
#List implementation of class
classImplements( SlapOSShadowAuthenticationPlugin,
plugins.IGroupsPlugin
)
classImplements( SlapOSShadowAuthenticationPlugin,
plugins.IUserEnumerationPlugin
)
InitializeClass(SlapOSShadowAuthenticationPlugin)
slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/Tool/ 0000775 0000000 0000000 00000000000 13221246610 0031031 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000000000 13221246610 0033051 0 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/Tool __init__.py 0000664 0000000 0000000 00000006504 13221246610 0032153 0 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS # -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from Products.ERP5Type.Utils import initializeProduct, updateGlobals
from AccessControl.Permissions import manage_users as ManageUsers
import sys
import Permissions
this_module = sys.modules[ __name__ ]
document_classes = updateGlobals(this_module, globals(),
permissions_module=Permissions)
object_classes = ()
content_classes = ()
content_constructors = ()
from Products.PluggableAuthService.PluggableAuthService import registerMultiPlugin
import SlapOSMachineAuthenticationPlugin
import SlapOSShadowAuthenticationPlugin
def initialize(context):
import Document
initializeProduct(context, this_module, globals(), document_module=Document,
document_classes=document_classes, object_classes=object_classes,
portal_tools=portal_tools, content_constructors=content_constructors,
content_classes=content_classes)
context.registerClass( SlapOSMachineAuthenticationPlugin.SlapOSMachineAuthenticationPlugin
, permission=ManageUsers
, constructors=(
SlapOSMachineAuthenticationPlugin.manage_addSlapOSMachineAuthenticationPluginForm,
SlapOSMachineAuthenticationPlugin.addSlapOSMachineAuthenticationPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( SlapOSShadowAuthenticationPlugin.SlapOSShadowAuthenticationPlugin
, permission=ManageUsers
, constructors=(
SlapOSShadowAuthenticationPlugin.manage_addSlapOSShadowAuthenticationPluginForm,
SlapOSShadowAuthenticationPlugin.addSlapOSShadowAuthenticationPlugin, )
, visibility=None
, icon='www/portal.gif'
)
registerMultiPlugin(SlapOSMachineAuthenticationPlugin.SlapOSMachineAuthenticationPlugin.meta_type)
registerMultiPlugin(SlapOSShadowAuthenticationPlugin.SlapOSShadowAuthenticationPlugin.meta_type)
slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/bin/ 0000775 0000000 0000000 00000000000 13221246610 0030664 5 ustar 00root root 0000000 0000000 bigfile_client_example.py 0000664 0000000 0000000 00000004515 13221246610 0035636 0 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/bin input_file = open('big_file.log', 'r')
import httplib
connection = httplib.HTTPConnection('192.168.242.68:12001')
import base64
base64string = base64.encodestring('zope:insecure')[:-1]
n = 1 << 20
######################################
# Create new document
######################################
headers = {
'Authorization': 'Basic %s' % base64string,
}
connection.request('POST', "/erp5/portal_contributions/newContent?" \
"portal_type=Big%20File&filename=big_file.log&container_path=big_file_module&data=", "", headers)
result = connection.getresponse()
path = result.getheader("X-Document-Location")
result.close()
path = '/%s' % '/'.join(path.split('/')[3:])
print path
######################################
# Upload chunks
######################################
input_file.seek(0, 2)
end = input_file.tell()
input_file.seek(0)
pos = input_file.tell()
first = True
while pos < end:
next_pos = pos + n
if next_pos > end:
next_pos = end
body_content = input_file.read(next_pos-pos)
if first:
headers = {
'Authorization': 'Basic %s' % base64string,
}
else:
headers = {
'Authorization': 'Basic %s' % base64string,
'Content-Range': 'bytes %i-%i/%i' % (pos, next_pos-1, next_pos),
}
connection.request('PUT', path, body_content, headers)
result = connection.getresponse()
pos = input_file.tell()
first = False
######################################
# Download chunks
######################################
output_file = open('output_big_file.log', 'wb')
output_file.seek(0)
headers = {
'Authorization': 'Basic %s' % base64string,
'Content-Range': 'bytes */*',
}
connection.request('PUT', path, '', headers)
result = connection.getresponse()
filerange = result.getheader('Range')
size = int(filerange.split('-')[1])
pos = 0
while pos < size:
next_pos = pos + n
if next_pos > size:
next_pos = size
headers = {
'Authorization': 'Basic %s' % base64string,
'Range': 'bytes=%s-%s' % (pos, next_pos),
}
connection.request('GET', path, '', headers)
result = connection.getresponse()
output_file.write(result.read())
result.close()
pos = output_file.tell()
slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/tests/ 0000775 0000000 0000000 00000000000 13221246610 0031256 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000000000 13221246610 0033276 0 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/tests testSlapOSMixin.py 0000664 0000000 0000000 00000057032 13221246610 0034626 0 ustar 00root root 0000000 0000000 slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/tests # -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Vifib SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
import random
import transaction
import unittest
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
import functools
from Products.ERP5Type.tests.utils import DummyMailHost
from Products.ERP5Type.Utils import convertToUpperCase
import os
import glob
from functools import wraps
from Products.ERP5Type.tests.utils import createZODBPythonScript
from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager
from App.config import getConfiguration
config = getConfiguration()
def changeSkin(skin_name):
def decorator(func):
def wrapped(self, *args, **kwargs):
default_skin = self.portal.portal_skins.default_skin
self.portal.portal_skins.changeSkin(skin_name)
self.app.REQUEST.set('portal_skin', skin_name)
try:
v = func(self, *args, **kwargs)
finally:
self.portal.portal_skins.changeSkin(default_skin)
self.app.REQUEST.set('portal_skin', default_skin)
return v
return wrapped
return decorator
def simulate(script_id, params_string, code_string):
def upperWrap(f):
@wraps(f)
def decorated(self, *args, **kw):
if script_id in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_id)
createZODBPythonScript(self.portal.portal_skins.custom,
script_id, params_string, code_string)
transaction.commit()
try:
result = f(self, *args, **kw)
finally:
if script_id in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_id)
transaction.commit()
return result
return decorated
return upperWrap
def withAbort(func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
try:
func(self, *args, **kwargs)
finally:
self.abort()
return wrapped
class testSlapOSMixin(ERP5TypeTestCase):
abort_transaction = 0
def clearCache(self):
self.portal.portal_caches.clearAllCache()
self.portal.portal_workflow.refreshWorklistCache()
#def getDefaultSitePreferenceId(self):
# """Default id, usefull method to override
# """
# return "slapos_default_system_preference"
def createAlarmStep(self):
def makeCallAlarm(alarm):
def callAlarm(*args, **kwargs):
sm = getSecurityManager()
self.login()
try:
alarm.activeSense(params=kwargs)
self.commit()
finally:
setSecurityManager(sm)
return callAlarm
for alarm in self.portal.portal_alarms.contentValues():
if alarm.isEnabled():
setattr(self, 'stepCall' + convertToUpperCase(alarm.getId()) \
+ 'Alarm', makeCallAlarm(alarm))
def createCertificateAuthorityFile(self):
"""Sets up portal_certificate_authority"""
# reset test CA to have it always count from 0
open(os.path.join(os.environ['TEST_CA_PATH'], 'serial'), 'w').write('01')
open(os.path.join(os.environ['TEST_CA_PATH'], 'crlnumber'), 'w').write(
'01')
open(os.path.join(os.environ['TEST_CA_PATH'], 'index.txt'), 'w').write('')
private_list = glob.glob('%s/*.key' % os.path.join(os.environ['TEST_CA_PATH'],
'private'))
for private in private_list:
os.remove(private)
crl_list = glob.glob('%s/*' % os.path.join(os.environ['TEST_CA_PATH'],
'crl'))
for crl in crl_list:
os.remove(crl)
certs_list = glob.glob('%s/*' % os.path.join(os.environ['TEST_CA_PATH'],
'certs'))
for cert in certs_list:
os.remove(cert)
newcerts_list = glob.glob('%s/*' % os.path.join(os.environ['TEST_CA_PATH'],
'newcerts'))
for newcert in newcerts_list:
os.remove(newcert)
def setupPortalAlarms(self):
if not self.portal.portal_alarms.isSubscribed():
self.portal.portal_alarms.subscribe()
self.assertTrue(self.portal.portal_alarms.isSubscribed())
def isLiveTest(self):
#return 'ERP5TypeLiveTestCase' in [q.__name__ for q in self.__class__.mro()]
# XXX - What is the better way to no if we are in live test mode ?
return not os.environ.has_key('TEST_CA_PATH')
def _setUpDummyMailHost(self):
"""Do not play with NON persistent replacement of MailHost"""
if not self.isLiveTest():
ERP5TypeTestCase._setUpDummyMailHost(self)
def _restoreMailHost(self):
"""Do not play with NON persistent replacement of MailHost"""
if not self.isLiveTest():
ERP5TypeTestCase._restoreMailHost(self)
def beforeTearDown(self):
if self.isLiveTest():
self.deSetUpPersistentDummyMailHost()
if self.abort_transaction:
transaction.abort()
def getUserFolder(self):
"""
Return the user folder
"""
return getattr(self.getPortal(), 'acl_users', None)
def setUpOnce(self):
self.commit()
self.portal.portal_templates.updateRepositoryBusinessTemplateList(
repository_list=self.portal.portal_templates.getRepositoryList())
self.commit()
self.launchConfigurator()
def afterSetUp(self):
self.login()
self.createAlarmStep()
# Helpfull for the tests
self.new_id = self.generateNewId()
if self.isLiveTest():
self.setUpPersistentDummyMailHost()
return
self.portal.portal_caches.erp5_site_global_id = '%s' % random.random()
self.portal.portal_caches._p_changed = 1
self.commit()
self.portal.portal_caches.updateCache()
try:
initsite = config.product_config["initsite"]
except KeyError:
initsite = {}
if initsite.get("cloudooo_url", None) is None:
from Products.ERP5Type.tests.ERP5TypeTestCase import\
_getConversionServerUrl
initsite["cloudooo_url"] = _getConversionServerUrl()
config.product_config["initsite"] = initsite
if not getattr(self.portal, 'is_site_bootstrapped', 0):
self.portal.is_site_bootstrapped = 1
self.bootstrapSite()
self.portal._p_changed = 1
self.commit()
def deSetUpPersistentDummyMailHost(self):
if 'MailHost' in self.portal.objectIds():
self.portal.manage_delObjects(['MailHost'])
self.portal.manage_addProduct['MailHost'].manage_addMailHost('MailHost')
self.commit()
def setUpPersistentDummyMailHost(self):
if 'MailHost' in self.portal.objectIds():
self.portal.manage_delObjects(['MailHost'])
self.portal._setObject('MailHost', DummyMailHost('MailHost'))
self.portal.email_from_address = 'romain@nexedi.com'
self.portal.email_to_address = 'romain@nexedi.com'
def getBusinessConfiguration(self):
return self.portal.business_configuration_module[\
"slapos_master_configuration_workflow"]
def launchConfigurator(self):
self.login()
# Create new Configuration
business_configuration = self.getBusinessConfiguration()
response_dict = {}
while response_dict.get("command", "next") != "install":
response_dict = self.portal.portal_configurator._next(
business_configuration, {})
self.tic()
self.portal.portal_configurator.startInstallation(
business_configuration,REQUEST=self.portal.REQUEST)
def bootstrapSite(self):
self.setupPortalAlarms()
self.createCertificateAuthorityFile()
self.getDefaultSystemPreference().setPreferredHateoasUrl("http://dummy/")
self.clearCache()
self.tic()
def getBusinessTemplateList(self):
"""
Install the business templates.
"""
result = [
'erp5_full_text_myisam_catalog',
'erp5_core_proxy_field_legacy',
'erp5_base',
'erp5_workflow',
'erp5_configurator',
'slapos_configurator',
]
return result
def makePerson(self, new_id=None, index=True, user=True):
if new_id is None:
new_id = self.generateNewId()
# Clone person document
person_user = self.portal.person_module.template_member.\
Base_createCloneDocument(batch_mode=1)
person_user.edit(
title="live_test_%s" % new_id,
reference="live_test_%s" % new_id,
default_email_text="live_test_%s@example.org" % new_id,
)
person_user.validate()
for assignment in person_user.contentValues(portal_type="Assignment"):
assignment.open()
if user:
login = self._addERP5Login(person_user)
if index:
transaction.commit()
person_user.immediateReindexObject()
if user:
login.immediateReindexObject()
return person_user
def _addERP5Login(self, document):
login = document.newContent(
portal_type="ERP5 Login",
reference=document.getReference())
login.validate()
return login
def _makeTree(self, requested_template_id='template_software_instance'):
new_id = self.generateNewId()
self.request_kw = dict(
software_release=self.generateNewSoftwareReleaseUrl(),
software_title=self.generateNewSoftwareTitle(),
software_type=self.generateNewSoftwareType(),
instance_xml=self.generateSafeXml(),
sla_xml=self.generateEmptyXml(),
shared=False,
state="started"
)
self.person_user = self.makePerson(new_id=new_id, index=False)
self.commit()
# prepare part of tree
self.hosting_subscription = self.portal.hosting_subscription_module\
.template_hosting_subscription.Base_createCloneDocument(batch_mode=1)
self.software_instance = self.portal.software_instance_module\
[requested_template_id].Base_createCloneDocument(batch_mode=1)
self.hosting_subscription.edit(
title=self.request_kw['software_title'],
reference="TESTHS-%s" % new_id,
url_string=self.request_kw['software_release'],
source_reference=self.request_kw['software_type'],
text_content=self.request_kw['instance_xml'],
sla_xml=self.request_kw['sla_xml'],
root_slave=self.request_kw['shared'],
predecessor=self.software_instance.getRelativeUrl(),
destination_section=self.person_user.getRelativeUrl()
)
self.hosting_subscription.validate()
self.portal.portal_workflow._jumpToStateFor(self.hosting_subscription, 'start_requested')
self.requested_software_instance = self.portal.software_instance_module\
.template_software_instance.Base_createCloneDocument(batch_mode=1)
self.software_instance.edit(
title=self.request_kw['software_title'],
reference="TESTSI-%s" % new_id,
url_string=self.request_kw['software_release'],
source_reference=self.request_kw['software_type'],
text_content=self.request_kw['instance_xml'],
sla_xml=self.request_kw['sla_xml'],
specialise=self.hosting_subscription.getRelativeUrl(),
predecessor=self.requested_software_instance.getRelativeUrl()
)
self.portal.portal_workflow._jumpToStateFor(self.software_instance, 'start_requested')
self.software_instance.validate()
self.requested_software_instance.edit(
title=self.generateNewSoftwareTitle(),
reference="TESTSI-%s" % self.generateNewId(),
url_string=self.request_kw['software_release'],
source_reference=self.request_kw['software_type'],
text_content=self.request_kw['instance_xml'],
sla_xml=self.request_kw['sla_xml'],
specialise=self.hosting_subscription.getRelativeUrl(),
)
self.portal.portal_workflow._jumpToStateFor(self.requested_software_instance, 'start_requested')
self.requested_software_instance.validate()
self.tic()
def _makeComputer(self, owner=None):
self.computer = self.portal.computer_module.template_computer\
.Base_createCloneDocument(batch_mode=1)
reference = 'TESTCOMP-%s' % self.generateNewId()
self.computer.edit(
allocation_scope='open/public',
capacity_scope='open',
reference=reference,
title=reference
)
self.computer.validate()
reference = 'TESTPART-%s' % self.generateNewId()
self.partition = self.computer.newContent(portal_type='Computer Partition',
reference=reference,
title=reference
)
self.partition.markFree()
self.partition.validate()
self.tic()
if owner is not None:
self.computer.edit(
source_administration_value=owner,
)
return self.computer, self.partition
def _makeComputerNetwork(self):
reference = 'TESTCOMPNETWORK-%s' % self.generateNewId()
self.computer_network = self.portal.computer_network_module.newContent(
portal_type='Computer Network',
reference=reference,
title=reference
)
self.computer_network.validate()
self.tic()
return self.computer_network
def _makeComplexComputer(self, person=None, with_slave=False):
for i in range(1, 5):
id_ = 'partition%s' % i
p = self.computer.newContent(portal_type='Computer Partition',
id=id_,
title=id_,
reference=id_,
default_network_address_ip_address='ip_address_%s' % i,
default_network_address_netmask='netmask_%s' % i)
p.markFree()
p.validate()
self.start_requested_software_installation = self.portal.software_installation_module\
.template_software_installation.Base_createCloneDocument(batch_mode=1)
self.start_requested_software_installation.edit(
url_string=self.generateNewSoftwareReleaseUrl(),
aggregate=self.computer.getRelativeUrl(),
reference='TESTSOFTINST-%s' % self.generateNewId(),
title='Start requested for %s' % self.computer.getTitle()
)
self.start_requested_software_installation.validate()
self.start_requested_software_installation.requestStart()
self.destroy_requested_software_installation = self.portal.software_installation_module\
.template_software_installation.Base_createCloneDocument(batch_mode=1)
self.destroy_requested_software_installation.edit(
url_string=self.generateNewSoftwareReleaseUrl(),
aggregate=self.computer.getRelativeUrl(),
reference='TESTSOFTINST-%s' % self.generateNewId(),
title='Destroy requested for %s' % self.computer.getTitle()
)
self.destroy_requested_software_installation.validate()
self.destroy_requested_software_installation.requestStart()
self.destroy_requested_software_installation.requestDestroy()
self.destroyed_software_installation = self.portal.software_installation_module\
.template_software_installation.Base_createCloneDocument(batch_mode=1)
self.destroyed_software_installation.edit(
url_string=self.generateNewSoftwareReleaseUrl(),
aggregate=self.computer.getRelativeUrl(),
reference='TESTSOFTINST-%s' % self.generateNewId(),
title='Destroyed for %s' % self.computer.getTitle()
)
self.destroyed_software_installation.validate()
self.destroyed_software_installation.requestStart()
self.destroyed_software_installation.requestDestroy()
self.destroyed_software_installation.invalidate()
self.computer.partition1.markBusy()
self.computer.partition2.markBusy()
self.computer.partition3.markBusy()
# prepare some trees
hosting_subscription = self.portal.hosting_subscription_module\
.template_hosting_subscription.Base_createCloneDocument(batch_mode=1)
hosting_subscription.validate()
hosting_subscription.edit(
title=self.generateNewSoftwareTitle(),
reference="TESTSI-%s" % self.generateNewId(),
destination_section_value=person,
)
kw = dict(
software_release=\
self.start_requested_software_installation.getUrlString(),
software_type=self.generateNewSoftwareType(),
instance_xml=self.generateSafeXml(),
sla_xml=self.generateSafeXml(),
shared=False,
software_title=hosting_subscription.getTitle(),
state='started'
)
hosting_subscription.requestStart(**kw)
hosting_subscription.requestInstance(**kw)
self.start_requested_software_instance = hosting_subscription.getPredecessorValue()
self.start_requested_software_instance.edit(aggregate=self.computer.partition1.getRelativeUrl())
if with_slave:
hosting_subscription = self.portal.hosting_subscription_module\
.template_hosting_subscription.Base_createCloneDocument(batch_mode=1)
hosting_subscription.validate()
hosting_subscription.edit(
title=self.generateNewSoftwareTitle(),
reference="TESTSI-%s" % self.generateNewId(),
destination_section_value=person,
)
slave_kw = dict(
software_release=kw['software_release'],
software_type=kw['software_type'],
instance_xml=self.generateSafeXml(),
sla_xml=self.generateSafeXml(),
shared=True,
software_title=hosting_subscription.getTitle(),
state='started'
)
hosting_subscription.requestStart(**slave_kw)
hosting_subscription.requestInstance(**slave_kw)
self.start_requested_slave_instance = hosting_subscription.getPredecessorValue()
self.start_requested_slave_instance.edit(aggregate=self.computer.partition1.getRelativeUrl())
hosting_subscription = self.portal.hosting_subscription_module\
.template_hosting_subscription.Base_createCloneDocument(batch_mode=1)
hosting_subscription.validate()
hosting_subscription.edit(
title=self.generateNewSoftwareTitle(),
reference="TESTSI-%s" % self.generateNewId(),
destination_section_value=person,
)
kw = dict(
software_release=\
self.start_requested_software_installation.getUrlString(),
software_type=self.generateNewSoftwareType(),
instance_xml=self.generateSafeXml(),
sla_xml=self.generateSafeXml(),
shared=False,
software_title=hosting_subscription.getTitle(),
state='stopped'
)
hosting_subscription.requestStop(**kw)
hosting_subscription.requestInstance(**kw)
self.stop_requested_software_instance = hosting_subscription.getPredecessorValue()
self.stop_requested_software_instance.edit(
aggregate=self.computer.partition2.getRelativeUrl()
)
hosting_subscription = self.portal.hosting_subscription_module\
.template_hosting_subscription.Base_createCloneDocument(batch_mode=1)
hosting_subscription.validate()
hosting_subscription.edit(
title=self.generateNewSoftwareTitle(),
reference="TESTSI-%s" % self.generateNewId(),
)
kw = dict(
software_release=\
self.start_requested_software_installation.getUrlString(),
software_type=self.generateNewSoftwareType(),
instance_xml=self.generateSafeXml(),
sla_xml=self.generateSafeXml(),
shared=False,
software_title=hosting_subscription.getTitle(),
state='stopped'
)
hosting_subscription.requestStop(**kw)
hosting_subscription.requestInstance(**kw)
kw['state'] = 'destroyed'
hosting_subscription.requestDestroy(**kw)
self.destroy_requested_software_instance = hosting_subscription.getPredecessorValue()
self.destroy_requested_software_instance.requestDestroy(**kw)
self.destroy_requested_software_instance.edit(
aggregate=self.computer.partition3.getRelativeUrl()
)
hosting_subscription = self.portal.hosting_subscription_module\
.template_hosting_subscription.Base_createCloneDocument(batch_mode=1)
hosting_subscription.validate()
hosting_subscription.edit(
title=self.generateNewSoftwareTitle(),
reference="TESTSI-%s" % self.generateNewId(),
)
kw = dict(
software_release=\
self.start_requested_software_installation.getUrlString(),
software_type=self.generateNewSoftwareType(),
instance_xml=self.generateSafeXml(),
sla_xml=self.generateSafeXml(),
shared=False,
software_title=hosting_subscription.getTitle(),
state='stopped'
)
hosting_subscription.requestStop(**kw)
hosting_subscription.requestInstance(**kw)
kw['state'] = 'destroyed'
hosting_subscription.requestDestroy(**kw)
self.destroyed_software_instance = hosting_subscription.getPredecessorValue()
self.destroyed_software_instance.edit(
aggregate=self.computer.partition4.getRelativeUrl()
)
self.destroyed_software_instance.requestDestroy(**kw)
self.destroyed_software_instance.invalidate()
self.tic()
if with_slave:
# as slave is created in non usual way update its local roles
self.start_requested_slave_instance.updateLocalRolesOnSecurityGroups()
self.tic()
self._cleaupREQUEST()
def _makeSoftwareProduct(self, new_id):
software_product = self.portal.software_product_module\
.template_software_product.Base_createCloneDocument(batch_mode=1)
software_product.edit(
reference='TESTSOFTPROD-%s' % new_id,
title='Test software product %s' % new_id
)
software_product.publish()
return software_product
def _makeSoftwareRelease(self, new_id):
software_release = self.portal.software_release_module\
.template_software_release.Base_createCloneDocument(batch_mode=1)
software_release.edit(
url_string=self.generateNewSoftwareReleaseUrl(),
reference='TESTSOFTRELS-%s' % new_id,
title='Start requested for %s' % new_id
)
software_release.release()
return software_release
def _cleaupREQUEST(self):
self.portal.REQUEST['request_instance'] = None
self.portal.REQUEST.headers = {}
def generateNewId(self):
return "%sö" % self.portal.portal_ids.generateNewId(
id_group=('slapos_core_test'))
def generateNewSoftwareReleaseUrl(self):
return 'http://example.org/têst%s.cfg' % self.generateNewId()
def generateNewSoftwareType(self):
return 'Type ë@î %s' % self.generateNewId()
def generateNewSoftwareTitle(self):
return 'Title é#ï %s' % self.generateNewId()
def generateSafeXml(self):
return '%s' % \
("paramé".decode("UTF-8").encode("UTF-8"),
self.generateNewId().decode("UTF-8").encode("UTF-8"))
def generateEmptyXml(self):
return ''
class TestSlapOSDummy(testSlapOSMixin):
run_all_test = 1
def test(self):
"""Dummy test in order to fire up Business Template testing"""
self.assertTrue(True)
def getTitle(self):
return "Dummy tests in order to have tests from BT5 run"
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestSlapOSDummy))
return suite
slapos.core-3ffed138165cd7a37033c0fabf812e70c8d9d588-master-product/master/product/SlapOS/tool.png 0000664 0000000 0000000 00000000437 13221246610 0031603 0 ustar 00root root 0000000 0000000 PNG
IHDR R 0PLTEVfHT^nM^i}x9CrCN~1