Commit 90d113c0 authored by Antoine Catton's avatar Antoine Catton

Clean the mocked httplib (removed unused modules

and white spaces)
parents 0edb9a2c 710ef80e
......@@ -26,7 +26,9 @@
</item>
<item>
<key> <string>after_script_id</string> </key>
<value> <string>BusinessConfiguration_displayVifibDownload</string> </value>
<value>
<none/>
</value>
</item>
<item>
<key> <string>categories</string> </key>
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_body</string> </key>
<value> <string># Do Nothing!\n
</string> </value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>configuration_save_url=None, **kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>BusinessConfiguration_displayVifibDownload</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_body</string> </key>
<value> <string>from Products.ERP5Type.Log import log\n
portal = context.getPortalObject()\n
portal_preferences = portal.portal_preferences\n
system_preference = None\n
clear_cache = 0\n
conversion_check = False\n
\n
log(conversion_server)\n
log(kumo)\n
log(memcached)\n
\n
\n
if conversion_server is not None:\n
conversion_server_address, conversion_server_port = conversion_server.split(":")\n
\n
def getActiveSystemPreference():\n
system_preference = portal_preferences.getActiveSystemPreference()\n
if system_preference is None:\n
system_preference = portal_preferences.newContent(\n
portal_type="System Preference", \n
title="Automatically Created.")\n
system_preference.enable()\n
return system_preference\n
\n
if portal_preferences.getPreferredOoodocServerAddress() != conversion_server_address:\n
system_preference = getActiveSystemPreference()\n
system_preference.setPreferredOoodocServerAddress(conversion_server_address)\n
clear_cache = 1\n
\n
if int(portal_preferences.getPreferredOoodocServerPortNumber("-1")) != int(conversion_server_port):\n
if system_preference is None:\n
system_preference = getActiveSystemPreference()\n
system_preference.setPreferredOoodocServerPortNumber(int(conversion_server_port))\n
clear_cache = 1\n
\n
if memcached is not None:\n
default_memcached_plugin = getattr(portal.portal_memcached, "default_memcached_plugin", None)\n
if default_memcached_plugin.getUrlString() != memcached:\n
default_memcached_plugin.setUrlString(memcached)\n
\n
if kumo is not None:\n
persistent_memcached_plugin = getattr(portal.portal_memcached, "persistent_memcached_plugin", None)\n
if persistent_memcached_plugin is not None:\n
if persistent_memcached_plugin.getUrlString() != kumo:\n
persistent_memcached_plugin.setUrlString(kumo)\n
\n
\n
if clear_cache:\n
portal.portal_caches.clearAllCache()\n
\n
if conversion_server is not None:\n
conversion_check = (portal_preferences.getPreferredOoodocServerAddress() == conversion_server_address) and \\\n
(int(portal_preferences.getPreferredOoodocServerPortNumber()) == int(conversion_server_port))\n
\n
return conversion_check and \\\n
default_memcached_plugin.getUrlString() == memcached and \\\n
persistent_memcached_plugin is not None and \\\n
persistent_memcached_plugin.getUrlString() == kumo\n
</string> </value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>conversion_server=None, memcached=None, kumo=None, **kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_assertExternalServiceList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
12
\ No newline at end of file
13
\ No newline at end of file
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ZopePageTemplate" module="Products.PageTemplates.ZopePageTemplate"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_text</string> </key>
<value> <unicode encoding="cdata"><![CDATA[
<tal:block\n
xmlns:tal="http://xml.zope.org/namespaces/tal"\n
xmlns:metal="http://xml.zope.org/namespaces/metal"\n
xmlns:i18n="http://xml.zope.org/namespaces/i18n"\n
tal:define="field_id here/getId;\n
form_id python: here.getForm().id;\n
selection_name here/getSelectionName;\n
selection here/getSelection;\n
real_context here/getContext;\n
global portal_url_string here/getPortalUrlString;\n
context_url real_context/absolute_url;\n
md5_string here/getMD5Checksum;\n
hide_rows_on_no_search_criterion here/isHideRowsOnNoSearchCriterion;\n
is_domain_tree_mode here/isDomainTreeMode;\n
is_report_tree_mode here/isReportTreeMode;\n
global is_domain_tree_supported here/isDomainTreeSupported;\n
global is_report_tree_supported here/isReportTreeSupported;\n
global is_gadget_mode request/is_gadget_mode | nothing;\n
show_select_column here/showSelectColumn;\n
show_anchor_column here/showAnchorColumn;\n
show_search_line here/showSearchLine;\n
is_web_mode real_context/isWebMode | nothing;\n
is_dialog_mode request/dialog_mode | nothing;\n
display_style_list here/getDisplayStyleList;\n
list_style here/getListboxDisplayStyle;\n
global_search_column here/getGlobalSearchColumn;\n
global_search_column_script string:Base_doSelect;\n
show_global_search python: global_search_column not in (\'\', None);\n
line_list here/query;\n
listbox_max_lines python: int(here.getMaxLineNumber());\n
total_line python: int(here.total_size);\n
current_page python: int(here.current_page) + 1;\n
current_page_max python: listbox_max_lines * current_page;\n
current_page_start python: (listbox_max_lines * (current_page - 1)) + 1;\n
current_page_stop python: (total_line < current_page_max) and total_line or current_page_max;\n
form_url string:${context_url}/${form_id};\n
need_pagination python: total_line > listbox_max_lines;\n
show_list_style_selection python: len(display_style_list) > 1;\n
show_listbox_tree_mode_selection python: not is_gadget_mode and \n
(is_domain_tree_supported or is_report_tree_supported);\n
show_list_action_link python: here.field.get_value(\'list_action\');\n
page_navigation_template python: request.get(\'page_navigation_template\', here.getPageNavigationTemplate());\n
is_slider_mode python: \'Slider\' in page_navigation_template;\n
is_default_listbox_field python: field_id==\'listbox\';\n
field_prefix python: \'\';">\n
\n
<!-- Define hidden input. -->\n
<input type="hidden" \n
name="list_selection_name" \n
value="default" \n
tal:attributes="value selection_name" />\n
<input type="hidden" \n
name="list_selection_name" \n
value="default"\n
tal:attributes="value selection_name;\n
name string:${field_id}_list_selection_name" />\n
<input tal:condition="md5_string" \n
type="hidden" \n
name="md5_object_uid_list" \n
value="checksum" \n
tal:attributes="value md5_string" />\n
<input tal:condition="form_id" \n
type="hidden" \n
name="form_id" \n
tal:attributes="value form_id" \n
tal:replace="nothing"/>\n
<input tal:condition="field_id" \n
type="hidden" \n
name="field_id" \n
tal:attributes="value field_id" \n
tal:replace="nothing"/>\n
\n
<tal:block tal:condition="is_gadget_mode">\n
<tal:block tal:define="global box_relative_url python: request.get(\'box_relative_url\', \'\');\n
global box python: real_context.restrictedTraverse(box_relative_url); \n
global box_id python: \'%s_content\' %box_relative_url.replace(\'/\', \'_\');\n
global dom_id python: request.get(\'dom_id\',None) or box_id;\n
global field_prefix string:${box_id}_">\n
<input tal:condition="python:form_id" \n
type="hidden" \n
name="gadget_form_id"\n
tal:attributes="value form_id" />\n
</tal:block>\n
</tal:block>\n
\n
<div class="listbox-container">\n
\n
<div class="listbox-tree">\n
\n
<!-- Domain Report Tree mode -->\n
<div class="listbox-domain-tree-container" \n
tal:condition="is_domain_tree_mode">\n
<tal:block tal:define="selected_domain_path here/getSelectedDomainPath">\n
\n
<!-- Select domain node -->\n
<select name="domain_root_url"\n
tal:attributes="onChange string:submitAction(this.form, \'${context_url}/setDomainRoot\')">\n
<tal:block tal:repeat="c here/getDomainRootList">\n
<option value="base_domain"\n
tal:define="path python: c[0]; title python: c[1]"\n
tal:attributes="selected python: path == selected_domain_path; value path"\n
tal:content="title"\n
i18n:translate="" i18n:domain="ui"/>\n
</tal:block>\n
</select>\n
\n
<!-- Domain node contents -->\n
<table cellpadding="0"\n
summary="This table contains the domain tree"\n
class="listbox-table-domain-tree"\n
tal:attributes="class string:${field_id}-table-domain-tree"\n
tal:define="report_tree_list python: here.makeReportTreeList(report_path = selected_domain_path, unfolded_list = selection.getDomainList(), is_report_opened = False, sort_on=((\'int_index\', \'ASC\'),));\n
total_depth python: max([report_tree.depth for report_tree in report_tree_list] + [-1])">\n
<tr tal:repeat="report_tree report_tree_list">\n
<tal:block tal:repeat="i python: range(report_tree.depth)">\n
<td width="12" nowrap="nowrap">&nbsp;</td>\n
</tal:block>\n
<td colspan="1" \n
class="listbox-table-domain-tree-cell"\n
tal:attributes="colspan python: total_depth - report_tree.depth + 1">\n
<button type="submit"\n
name="foldDomain:method"\n
class="tree-open"\n
tal:condition="report_tree/is_open"\n
tal:content="report_tree/obj/getCompactTranslatedTitle"\n
tal:attributes="value string:${report_tree/domain_url}.${report_tree/depth}"/>\n
<button type="submit"\n
name="unfoldDomain:method"\n
class="tree-closed"\n
tal:condition="not: report_tree/is_open"\n
tal:content="report_tree/obj/getCompactTranslatedTitle"\n
tal:attributes="value string:${report_tree/domain_url}.${report_tree/depth}"/>\n
</td>\n
</tr>\n
</table>\n
</tal:block>\n
</div>\n
</div>\n
\n
<div class="listbox-content" \n
tal:attributes="class python: test(not is_domain_tree_mode, \'listbox-content maximal-width\', \'listbox-content listbox-content-fixed-width\')">\n
\n
<div class="listbox-head">\n
\n
<div class="listbox-head-spacer"></div>\n
\n
<div class="listbox-head-content">\n
\n
<!-- Listbox head (in left) -->\n
<div class="listbox-head-title">\n
\n
<!-- List tree mode choice -->\n
<div class="listbox-header-box"\n
tal:condition="python: show_listbox_tree_mode_selection and not is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_tree_mode_selection"/>\n
</div>\n
\n
<!-- Listbox title -->\n
<div class="listbox-header-box">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_title"/>\n
</div>\n
\n
<!-- Number of rows in ERP5 mode -->\n
<div class="listbox-header-box"\n
tal:condition="python: not is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_number_of_records"/>\n
</div>\n
\n
<!-- List style display mode -->\n
<div class="listbox-header-box"\n
tal:condition="python: show_list_style_selection and not is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_display_style_selection"/>\n
</div>\n
\n
</div>\n
\n
<!-- Listbox nagivation (in right) -->\n
<div class="listbox-head-navigation">\n
\n
<!--Show search result in web mode-->\n
<div class="listbox-header-box"\n
tal:condition="python: is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_number_of_records"/>\n
</div>\n
\n
<!--Page navigation -->\n
<div class="listbox-header-box"\n
tal:condition="python: need_pagination and not is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_navigation"/>\n
</div>\n
\n
</div>\n
\n
\n
</div>\n
\n
\n
</div>\n
\n
<div class="listbox-body">\n
<table class="listbox"\n
tal:attributes="class python: \'listbox %s %s-%s\' %(field_id, field_id, list_style)"> \n
<thead>\n
<!--Column title -->\n
<tr class="listbox-label-line"> \n
\n
<!--Report tree-->\n
<th tal:condition="is_report_tree_mode"\n
class="listbox-table-report-tree-selection-cell">\n
<select name="report_root_url"\n
tal:attributes="onChange string:submitAction(this.form, \'${context_url}/setReportRoot\')">\n
<tal:block tal:repeat="c here/getReportRootList">\n
<option value="base_domain"\n
tal:define="path python: c[0]; title python: c[1]"\n
tal:attributes="selected python: path == here.getSelectedReportPath(); value path"\n
tal:content="title" i18n:domain="ui" i18n:translate="">Domain</option>\n
</tal:block>\n
</select>\n
</th>\n
\n
<!-- Anchor cell -->\n
<th class="listbox-table-anchor-cell" tal:condition="show_anchor_column">&nbsp;</th>\n
\n
<!-- Select cell -->\n
<th tal:condition="python: show_select_column"\n
class="listbox-table-select-cell">\n
\n
<input class="listbox-check-all"\n
type="image"\n
name="checkAll:method" value="1"\n
alt="Check All" title="Check All"\n
tal:attributes="name string:${field_id}_checkAll:method;\n
src string:${portal_url_string}/images/checkall.png"\n
i18n:domain="ui" i18n:attributes="title" />\n
&nbsp;\n
<input class="listbox-uncheck-all"\n
type="image" \n
name="uncheckAll:method" value="1"\n
alt="Uncheck All" title="Uncheck All"\n
tal:attributes="src string:${portal_url_string}/images/decheckall.png;\n
name string:${field_id}_uncheckAll:method;"\n
i18n:domain="ui" i18n:attributes="title" /> \n
\n
</th>\n
\n
<!-- Label column row -->\n
<tal:block tal:repeat="value here/getLabelValueList">\n
<tal:block tal:define="sql python: value[0];\n
title python: value[1];\n
sort_order python: value[2]">\n
\n
<th tal:condition="sql" class="listbox-table-header-cell"\n
tal:define="bt_class python: sort_order==\'ascending\' and \'sort-button sort-button-asc\' \n
or sort_order == \'descending\' and \'sort-button sort-button-desc\' \n
or \'sort-button\';\n
bt_title python: sort_order==\'ascending\' and \'Ascending Display\'\n
or sort_order==\'descending\' and \'Descending Display\'\n
or \'Sort\';\n
listbox_field_id string:${field_id}.${sql};"> \n
<!-- Button in normal view -->\n
<button tal:condition="not:is_gadget_mode"\n
type="submit"\n
name="setSelectionQuickSortOrder:method"\n
tal:attributes="title title;\n
value listbox_field_id;\n
class bt_class;"\n
i18n:domain="ui" i18n:attributes="title">\n
<span i18n:translate="" i18n:domain="ui" tal:content="title"/>\n
</button>\n
\n
<!-- Button in gadget mode -->\n
<button tal:condition="is_gadget_mode" \n
tal:define ="params python: {\'setSelectionQuickSortOrder:method\':listbox_field_id};"\n
type="button" \n
tal:attributes="title title; \n
onclick python: real_context.KnowledgePad_generateAjaxCall(context_url+\'/\'+form_id,box,dom_id,params);\n
class bt_class;"\n
i18n:domain="ui" i18n:attributes="title">\n
<span i18n:translate="" i18n:domain="ui" tal:content="title"/>\n
</button>\n
\n
<!-- Icon showing sort order -->\n
<img src="images/transparent-image.gif"\n
tal:attributes="alt bt_title;\n
title bt_title;\n
class bt_class;\n
src string:${portal_url_string}/images/transparent-image.gif"\n
i18n:domain="ui" i18n:attributes="title;alt" />\n
\n
</th>\n
\n
<th class="listbox-table-header-cell"\n
tal:condition="not: sql" \n
tal:content="title" \n
i18n:domain="ui" i18n:translate=""/>\n
\n
</tal:block>\n
</tal:block>\n
</tr>\n
\n
<!--Search column input -->\n
<tr tal:condition="python: show_search_line or is_report_tree_mode"\n
class="listbox-search-line">\n
\n
<!--Report Tree -->\n
<tal:block tal:condition="is_report_tree_mode">\n
<th class="listbox-table-report-tree-selection-cell"\n
colspan="1"\n
tal:attributes="colspan python: show_search_line and 1 or (len(here.getSelectedColumnList()) + show_select_column + show_anchor_column + 1)"\n
tal:define="selection_index here/getSelectionIndex;\n
index python: selection_index is not None and \'&amp;selection_index=%s\' % selection_index or \'\';\n
is_report_opened python: int(not here.getSelection().isReportOpened());\n
requested_selection_name here/getRequestedSelectionName;\n
url here/getUrl;\n
report_depth python: selection.getParams().get(\'report_depth\', request.get(\'report_depth\', 0))">\n
<tal:block tal:repeat="i python: range(0, 6)">&nbsp;\n
<a href="?selection_name=default&amp;selection_index=0&amp;report_depth:int=0"\n
tal:attributes="href string:${url}?selection_name=${requested_selection_name}${index}&amp;report_depth:int=${i};\n
class python: test(i==report_depth, \'selected\', \'\');"\n
tal:content="i"/>\n
</tal:block>&nbsp;-&nbsp;\n
<a \n
href="?selection_name=default&amp;selection_index=0&amp;is_report_opened:int=0"\n
tal:attributes="href string:${url}?selection_name=${requested_selection_name}${index}&amp;is_report_opened:int=${is_report_opened}"\n
tal:content="python: is_report_opened and \'Show\' or \'Hide\'"\n
i18n:domain="ui" i18n:translate="">Show</a>\n
</th>\n
</tal:block>\n
\n
<!--Anchor cell -->\n
<th class="listbox-table-anchor-cell" tal:condition="show_anchor_column">&nbsp;</th>\n
\n
<!--Select cell -->\n
<th tal:condition="show_select_column"\n
class="listbox-table-select-cell">\n
<input class="listbox-select-action" type="image"\n
title="Action" alt="Action" name="Base_doSelect:method"\n
tal:attributes="class string:${field_id}-select-action;\n
src string:${portal_url_string}/images/exec16.png"\n
i18n:domain="ui" i18n:attributes="title" />\n
</th>\n
\n
<!-- Real search columns headers -->\n
<th class="listbox-table-filter-cell"\n
tal:condition="show_search_line"\n
tal:repeat="value here/getSearchValueList">\n
<tal:block tal:define="alias python: value[0];\n
param python: value[1];\n
search_field python: value[2]"\n
tal:condition="alias">\n
<!-- Render search field -->\n
<tal:block tal:condition="python: search_field is not None"\n
tal:replace="structure python: search_field.render(value=param, key=alias)"/>\n
\n
<tal:block tal:condition="python: search_field is None">\n
<input tal:condition="python: not is_gadget_mode" \n
size="5"\n
type="text" \n
tal:attributes="name string:${field_id}_${alias}; \n
value param"\n
onkeypress="submitFormOnEnter(event, this.form, \'Base_doSelect\');"/>\n
<!-- Search for gadget mode -->\n
<input tal:condition="python: is_gadget_mode" \n
tal:define ="params python: {alias:\'this.value\'};"\n
size="8"\n
type="text" \n
tal:attributes=\'value python: selection.getParams().get(alias,"");\n
onkeypress python:"if(event.keyCode==13){" + real_context.KnowledgePad_generateAjaxCall(context_url+"/"+form_id,box,dom_id,params).replace("\\"this.value\\"","this.value")+ "return false;;}"\'/>\n
</tal:block>\n
</tal:block>\n
</th>\n
</tr>\n
</thead>\n
\n
<!-- Stats -->\n
<tfoot tal:condition="python:here.showStat() and not hide_rows_on_no_search_criterion">\n
\n
<tr class="listbox_stat_line"\n
tal:attributes="class string:${field_id}_stat_line listbox-stat-line">\n
<td tal:condition="is_report_tree_mode" >&nbsp;</td>\n
<td class="listbox-table-anchor-cell" tal:condition="show_anchor_column">&nbsp;</td>\n
<td class="listbox-table-select-cell" tal:condition="show_select_column">&nbsp;</td>\n
<tal:block tal:repeat="value here/getStatValueList">\n
<td align="left"\n
class="listbox-table-data-cell"\n
tal:define="original_value python: value[0]; processed_value python: value[1]"\n
tal:content="structure processed_value" />\n
</tal:block>\n
</tr>\n
</tfoot>\n
\n
<tbody>\n
\n
<!-- Render listbox data-->\n
<tal:block tal:condition="line_list"\n
tal:define="checked_uid_set here/getCheckedUidSet">\n
<tr tal:repeat="line line_list" \n
tal:attributes=" \n
class python: line.getRowCSSClassName() or \'%s %s\' %(\'%s-data-line-%s\' %(field_id, repeat[\'line\'].index) ,test(repeat[\'line\'].index % 2, \'DataB\', \'DataA\'));">\n
\n
<tal:block tal:define="render_result line/render">\n
\n
<!--Report tree column -->\n
<td tal:condition="is_report_tree_mode"\n
class="listbox-table-report-tree-selection-cell" \n
tal:define="section_name python: line.getDomainTitle()">\n
<a tal:condition="section_name"\n
tal:define="method_id python: line.isOpen() and \'foldReport\' or \'unfoldReport\'"\n
tal:attributes="href string:${method_id}?report_url=${line/getDomainUrl}&amp;form_id=${form_id}&amp;list_selection_name=${selection_name};\n
class python:test(line.isOpen(), \'tree-open\', \'tree-closed\');\n
style python:\'white-space: nowrap;; margin-left: %spx\' % (line.getDepth() * 15)"\n
tal:content="section_name"/>\n
</td>\n
\n
<!--Anchor cell -->\n
<td tal:condition="show_anchor_column"\n
class="listbox-table-anchor-cell">\n
<!--Use [0][4] ? :(-->\n
<a href="#" tal:attributes="href python:render_result[0][4]">\n
<img src="document_icon.gif" alt="document" \n
tal:attributes="src string:${portal_url_string}/images/line_clickable.png" />\n
</a>\n
</td>\n
\n
<!--Select cell -->\n
<td tal:condition="show_select_column"\n
class="listbox-table-select-cell">\n
<input tal:condition="python: not line.isSummary()"\n
type="radio" id="listbox_cb_1" name="uids:list"\n
tal:attributes="checked python: line.getUid() in checked_uid_set;\n
value line/getUid;\n
id string:${field_id}_cb_${line/getUid}" /> \n
</td>\n
\n
<!-- Data cells -->\n
<tal:block tal:repeat="value render_result">\n
<td class="listbox-table-data-cell"\n
tal:define="html python: value[0];">\n
<input tal:condition="not: repeat/value/index"\n
type="hidden" value="1" name="listbox_uid:list"\n
tal:attributes="value python: line.getUid() or \'\';\n
name string:${field_prefix}${field_id}_uid:list" />\n
<tal:block tal:replace="structure html"/>\n
</td>\n
</tal:block>\n
</tal:block>\n
</tr>\n
</tal:block>\n
\n
<!-- Hide row on no search criterion-->\n
<tr tal:condition="hide_rows_on_no_search_criterion"\n
class="listbox_missing_search_criterion">\n
<td tal:attributes="colspan python: len(here.getSearchValueList()) + 1">\n
<span i18n:translate="" i18n:domain="ui">\n
To display actual content, please fill in one or more search criterion.\n
</span>\n
</td>\n
</tr>\n
\n
<!-- No results. -->\n
<tr tal:condition="python: total_line == 0 and not hide_rows_on_no_search_criterion">\n
<td tal:attributes="colspan python: len(here.getSearchValueList()) + 1"\n
class="listbox-table-no-result-row">\n
<span i18n:translate="" i18n:domain="ui">\n
No result.\n
</span>\n
</td>\n
</tr>\n
</tbody>\n
\n
</table>\n
</div>\n
\n
<div class="listbox-footer">\n
\n
<!-- List tree mode choice -->\n
<div class="listbox-footer-box"\n
tal:condition="python: show_listbox_tree_mode_selection and is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_tree_mode_selection"/>\n
</div>\n
\n
<!-- List style display mode -->\n
<div class="listbox-footer-box"\n
tal:condition="python: show_list_style_selection and is_web_mode">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_display_style_selection"/>\n
</div>\n
\n
<!-- Full text search -->\n
<div class="listbox-footer-box"\n
tal:condition="show_global_search">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_global_search"/>\n
</div>\n
\n
<!--Page navigation in web mode floating in right (slider) or whole width (text) -->\n
<div class="listbox-footer-box"\n
tal:condition="python: need_pagination and is_web_mode"\n
tal:attributes="style python: test(is_slider_mode, \'float:right\', \'width:100%\')">\n
<tal:block metal:use-macro="container/ListBox_asHTMLLibrary/macros/listbox_navigation"/>\n
</div>\n
\n
</div>\n
</div>\n
</div>\n
\n
</tal:block>
]]></unicode> </value>
</item>
<item>
<key> <string>content_type</string> </key>
<value> <string>text/html</string> </value>
</item>
<item>
<key> <string>expand</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ListBox_asHTML</string> </value>
</item>
<item>
<key> <string>output_encoding</string> </key>
<value> <string>iso-8859-15</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <unicode></unicode> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
298
\ No newline at end of file
299
\ No newline at end of file
......@@ -7623,6 +7623,145 @@ class TestVifibSlapWebService(testVifibMixin):
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def stepStoreCurrentComputerReferenceBufferA(self, sequence, **kw):
sequence['buffer_a_computer_reference'] = sequence['computer_reference']
def stepStoreCurrentComputerReferenceBufferB(self, sequence, **kw):
sequence['buffer_b_computer_reference'] = sequence['computer_reference']
def stepStoreCurrentComputerPartitionReferenceBufferA(self, sequence, **kw):
sequence['buffer_a_computer_partition_reference'] = sequence['computer_partition_reference']
def stepStoreCurrentComputerPartitionReferenceBufferB(self, sequence, **kw):
sequence['buffer_b_computer_partition_reference'] = sequence['computer_partition_reference']
def stepRestoreComputerReferenceFromBufferA(self, sequence, **kw):
sequence['computer_reference'] = sequence['buffer_a_computer_reference']
def stepRestoreComputerReferenceFromBufferB(self, sequence, **kw):
sequence['computer_reference'] = sequence['buffer_b_computer_reference']
def stepRestoreComputerPartitionReferenceFromBufferA(self, sequence, **kw):
sequence['computer_partition_reference'] = sequence['buffer_a_computer_partition_reference']
def stepRestoreComputerPartitionReferenceFromBufferB(self, sequence, **kw):
sequence['computer_partition_reference'] = sequence['buffer_b_computer_partition_reference']
def test_bug_destruction_of_partition_originated_from_another_computer(self):
"""Checks that computer is capable to destroy own Software Instance
If software instance originated on computer comes from another computer it
shall be possible to sucesfully destroy it.
"""
sequence_list = SequenceList()
sequence_string = self.prepare_install_requested_computer_partition_sequence_string + \
"""
StoreCurrentComputerReferenceBufferA
StoreCurrentComputerPartitionReferenceBufferA
""" + \
self.prepare_formated_computer + \
"""
StoreCurrentComputerReferenceBufferB
StoreCurrentComputerPartitionReferenceBufferB
LoginTestVifibAdmin
RequestSoftwareInstallation
Tic
Logout
SlapLoginCurrentComputer
ComputerSoftwareReleaseAvailable
Tic
SlapLogout
RestoreComputerReferenceFromBufferA
RestoreComputerPartitionReferenceFromBufferA
SlapLoginCurrentSoftwareInstance
RequestComputerPartitionNotReadyResponse
Tic
SlapLogout
SlapLoginCurrentSoftwareInstance
RequestComputerPartition
Tic
SlapLogout
LoginDefaultUser
CheckSoftwareInstanceAndRelatedComputerPartition
CheckRequestedSoftwareInstanceAndRelatedComputerPartition
Logout
SlapLoginCurrentSoftwareInstance
CheckRequestedComputerPartitionCleanParameterList
Logout
LoginDefaultUser
SetCurrentSoftwareInstanceRequested
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
RestoreComputerReferenceFromBufferB
RestoreComputerPartitionReferenceFromBufferB
SlapLoginCurrentComputer
SoftwareInstanceBuilding
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceSetupSalePackingListStarted
Logout
SlapLoginCurrentComputer
SoftwareInstanceAvailable
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceSetupSalePackingListStopped
CheckComputerPartitionInstanceHostingSalePackingListConfirmed
Logout
SlapLoginCurrentComputer
SoftwareInstanceStarted
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceHostingSalePackingListStarted
SetCurrentSoftwareInstanceRequester
SetSelectedComputerPartition
SelectCurrentlyUsedSalePackingListUid
Logout
LoginTestVifibCustomer
RequestSoftwareInstanceDestroy
Tic
Logout
LoginDefaultUser
CheckComputerPartitionInstanceCleanupSalePackingListConfirmed
Logout
RestoreComputerReferenceFromBufferA
RestoreComputerPartitionReferenceFromBufferA
SlapLoginCurrentComputer
SoftwareInstanceDestroyed
Tic
SlapLogout
LoginDefaultUser
CheckComputerPartitionInstanceCleanupSalePackingListDelivered
CheckComputerPartitionIsFree
Logout
"""
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
raise NotImplementedError
########################################
# Other tests
########################################
......
......@@ -57,4 +57,5 @@ setup(name=name,
'slapproxy = slapos.proxy:main',
]
},
test_suite="slapos.tests",
)
......@@ -27,13 +27,13 @@
##############################################################################
from optparse import OptionParser, Option
from xml_marshaller import xml_marshaller
from pwd import getpwnam
import ConfigParser
import grp
import logging
import netaddr
import netifaces
import os
import pwd
import random
import slapos.slap as slap
import socket
......@@ -317,7 +317,7 @@ class Computer:
slapsoft.path = self.software_root
if alter_user:
slapsoft.create()
slapsoft_pw = getpwnam(slapsoft.name)
slapsoft_pw = pwd.getpwnam(slapsoft.name)
os.chown(self.software_root, slapsoft_pw.pw_uid, slapsoft_pw.pw_gid)
os.chmod(self.software_root, 0755)
......@@ -415,7 +415,7 @@ class Partition:
if not os.path.exists(self.path):
os.mkdir(self.path, 0750)
if alter_user:
owner_pw = getpwnam(owner.name)
owner_pw = pwd.getpwnam(owner.name)
os.chown(self.path, owner_pw.pw_uid, owner_pw.pw_gid)
os.chmod(self.path, 0750)
......@@ -457,7 +457,7 @@ class User:
user_parameter_list.extend(['-G', ','.join(self.additional_group_list)])
user_parameter_list.append(self.name)
try:
getpwnam(self.name)
pwd.getpwnam(self.name)
except KeyError:
callAndRead(['useradd'] + user_parameter_list)
else:
......@@ -475,7 +475,7 @@ class User:
"""
try:
getpwnam(self.name)
pwd.getpwnam(self.name)
return True
except KeyError:
......@@ -572,7 +572,7 @@ class Tap:
owner_id = int(open(check_file).read().strip())
except Exception:
pass
if (owner_id is None) or (owner_id != getpwnam(owner.name).pw_uid):
if (owner_id is None) or (owner_id != pwd.getpwnam(owner.name).pw_uid):
callAndRead(['tunctl', '-t', self.name, '-u', owner.name])
callAndRead(['ip', 'link', 'set', self.name, 'up'])
......@@ -584,7 +584,7 @@ class Tap:
class Bridge:
"Bridge represent a bridge on the system"
def __init__(self, name, ipv4_local_network, ipv6_interface):
def __init__(self, name, ipv4_local_network, ipv6_interface=None):
"""
Attributes:
name: String, the name of the bridge
......@@ -846,10 +846,13 @@ class Parser(OptionParser):
help="Shall slapformat alter network configuration [default: True]"),
])
def check_args(self):
def check_args(self, args):
"""
Check arguments
"""
if args:
(options, args) = self.parse_args(list(args))
else:
(options, args) = self.parse_args()
if len(args) != 1:
self.error("Incorrect number of arguments")
......@@ -1093,17 +1096,17 @@ class Config:
self.computer_xml = os.path.abspath(self.computer_xml)
def main():
def main(*args):
"Run default configuration."
global os
global callAndRead
global getpwnam
global pwd
real_callAndRead = callAndRead
usage = "usage: %s [options] CONFIGURATION_FILE" % sys.argv[0]
try:
# Parse arguments
options, configuration_file_path = Parser(usage=usage).check_args()
options, configuration_file_path = Parser(usage=usage).check_args(args)
config = Config()
config.setConfig(options, configuration_file_path)
os = OS(config)
......@@ -1125,7 +1128,7 @@ def main():
pw_uid = 12345
pw_gid = 54321
return result
getpwnam = fake_getpwnam
pwd.getpwnam = fake_getpwnam
else:
dry_callAndRead = real_callAndRead
if config.verbose:
......
......@@ -346,7 +346,7 @@ class Slapgrid(object):
computer_partition_list = self.computer.getComputerPartitionList()
except socket.error as error:
self.logger.fatal(error)
sys.exit(1)
raise
return computer_partition_list
def processSoftwareReleaseList(self):
......
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""Mocked httplib
"""
__all__ = []
def log(message):
"""Need to be overridden to get a proper logger
"""
pass
class HTTPConnection(object):
scheme = 'http'
def _callback(self, path, method, body, headers):
"""To get it works properly, you need to override
HTTPConnection._callback. This method received the instance, the path,
method and request body as parameter, and it has to return a tuple with
headers dictionary and body response string.
@param self object instance reference
@param URL the parsed URL
@param method the http method
@param body the request body
@param headers the request headers
@return tuple containing status integer, headers dictionary and body
response"""
return (0, {}, '', )
def __init__(self, host, port=None, strict=None,
timeout=None, source_address=None):
self.host = host
self.port = port
self.strict = strict
self.timeout = timeout
self.source_address = source_address
self.__response = None
def request(self, method, url, body=None, headers=None):
status, headers, body = self._callback(url, method, body, headers)
self.__response = HTTPResponse('HTTP/1.1', 200, 'OK', body, headers)
def getresponse(self):
response = self.__response
self.__response = None
return response
def set_debuglevel(self, level):
pass
def set_tunnel(self, host, port=None, headers=None):
pass
def connect(self):
pass
def close(self):
pass
def putrequest(self, request, selector, skip_host=None,
skip_accept_encoding=None):
pass
def putheader(self, *args):
pass
def endheaders(self):
pass
def send(self, data):
pass
class HTTPSConnection(HTTPConnection):
def __init__(self, host, port=None, key_file=None,
cert_file=None, strict=None, timeout=None,
source_address=None):
super().__init__(self, host, port, strict, timeout,
source_address)
pass
class HTTPResponse(object):
def __init__(self, version, status, reason, content, headers=()):
self.version = version
self.status = status
self.reason = reason
self.__headers = headers
self.__content = content
def read(self, amt=None):
result = None
if amt is None:
result = self.__content
self.__content = ''
else:
end = max(amt, len(self.__content))
result = self.__content[:end]
del self.__content[:end]
return result
def getheader(self, name, default=None):
pass
def getheaders(self):
pass
import logging
import slapos.format
import unittest
import netaddr
# for mocking
import grp
import netifaces
import os
import pwd
import time
USER_LIST = []
GROUP_LIST = []
INTERFACE_DICT = {}
class FakeConfig:
pass
class TestLoggerHandler(logging.Handler):
def __init__(self, *args, **kwargs):
self.bucket = []
logging.Handler.__init__(self, *args, **kwargs)
def emit(self, record):
self.bucket.append(record.msg)
class FakeCallAndRead:
def __init__(self):
self.external_command_list = []
def __call__(self, argument_list, raise_on_error=True):
retval = 0, 'UP'
global INTERFACE_DICT
if 'useradd' in argument_list:
global USER_LIST
USER_LIST.append(argument_list[-1])
elif 'groupadd' in argument_list:
global GROUP_LIST
GROUP_LIST.append(argument_list[-1])
elif argument_list[:3] == ['ip', 'addr', 'add']:
ip, interface = argument_list[3], argument_list[5]
if ':' not in ip:
netmask = netaddr.strategy.ipv4.int_to_str(
netaddr.strategy.ipv4.prefix_to_netmask[int(ip.split('/')[1])])
ip = ip.split('/')[0]
INTERFACE_DICT[interface][2].append({'addr': ip, 'netmask': netmask})
else:
netmask = netaddr.strategy.ipv6.int_to_str(
netaddr.strategy.ipv6.prefix_to_netmask[int(ip.split('/')[1])])
ip = ip.split('/')[0]
INTERFACE_DICT[interface][10].append({'addr': ip, 'netmask': netmask})
# stabilise by mangling ip to just ip string
argument_list[3] = 'ip/%s' % netmask
elif argument_list[:3] == ['ip', 'addr', 'list']:
retval = 0, str(INTERFACE_DICT)
self.external_command_list.append(' '.join(argument_list))
return retval
class LoggableWrapper:
def __init__(self, logger, name):
self.__logger = logger
self.__name = name
def __call__(self, *args, **kwargs):
arg_list = [repr(x) for x in args] + [
'%s=%r' % (x, y) for x, y in kwargs.iteritems()]
self.__logger.debug('%s(%s)' % (self.__name, ', '.join(arg_list)))
class TimeMock:
@classmethod
def sleep(self, seconds):
return
class GrpMock:
@classmethod
def getgrnam(self, name):
global GROUP_LIST
if name in GROUP_LIST:
return True
raise KeyError
class PwdMock:
@classmethod
def getpwnam(self, name):
global USER_LIST
if name in USER_LIST:
class result:
pw_uid = 0
pw_gid = 0
return result
raise KeyError
class NetifacesMock:
@classmethod
def ifaddresses(self, name):
global INTERFACE_DICT
if name in INTERFACE_DICT:
return INTERFACE_DICT[name]
raise ValueError
@classmethod
def interfaces(self):
global INTERFACE_DICT
return INTERFACE_DICT.keys()
class SlapformatMixin(unittest.TestCase):
# keep big diffs
maxDiff = None
def patchNetifaces(self):
self.netifaces = NetifacesMock()
self.saved_netifaces = dict()
for fake in vars(NetifacesMock):
self.saved_netifaces[fake] = getattr(netifaces, fake, None)
setattr(netifaces, fake, getattr(self.netifaces, fake))
def restoreNetifaces(self):
for name, original_value in self.saved_netifaces.items():
setattr(netifaces, name, original_value)
del self.saved_netifaces
def patchPwd(self):
self.saved_pwd = dict()
for fake in vars(PwdMock):
self.saved_pwd[fake] = getattr(pwd, fake, None)
setattr(pwd, fake, getattr(PwdMock, fake))
def restorePwd(self):
for name, original_value in self.saved_pwd.items():
setattr(pwd, name, original_value)
del self.saved_pwd
def patchTime(self):
self.saved_time = dict()
for fake in vars(TimeMock):
self.saved_time[fake] = getattr(time, fake, None)
setattr(time, fake, getattr(TimeMock, fake))
def restoreTime(self):
for name, original_value in self.saved_time.items():
setattr(time, name, original_value)
del self.saved_time
def patchGrp(self):
self.saved_grp = dict()
for fake in vars(GrpMock):
self.saved_grp[fake] = getattr(grp, fake, None)
setattr(grp, fake, getattr(GrpMock, fake))
def restoreGrp(self):
for name, original_value in self.saved_grp.items():
setattr(grp, name, original_value)
del self.saved_grp
def patchOs(self, logger):
self.saved_os = dict()
for fake in ['mkdir', 'chown', 'chmod', 'makedirs']:
self.saved_os[fake] = getattr(os, fake, None)
f = LoggableWrapper(logger, fake)
setattr(os, fake, f)
def restoreOs(self):
for name, original_value in self.saved_os.items():
setattr(os, name, original_value)
del self.saved_os
def setUp(self):
config = FakeConfig()
config.dry_run = True
config.verbose = True
logger = logging.getLogger('testcatch')
logger.setLevel(logging.DEBUG)
self.test_result = TestLoggerHandler()
logger.addHandler(self.test_result)
config.logger = logger
self.partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
global USER_LIST
USER_LIST = []
global GROUP_LIST
GROUP_LIST = []
global INTERFACE_DICT
INTERFACE_DICT = {}
self.real_callAndRead = slapos.format.callAndRead
self.fakeCallAndRead = FakeCallAndRead()
slapos.format.callAndRead = self.fakeCallAndRead
self.patchOs(logger)
self.patchGrp()
self.patchTime()
self.patchPwd()
self.patchNetifaces()
def tearDown(self):
self.restoreOs()
self.restoreGrp()
self.restoreTime()
self.restorePwd()
self.restoreNetifaces()
slapos.format.callAndRead = self.real_callAndRead
class TestComputer(SlapformatMixin):
def test_getAddress_empty_computer(self):
computer = slapos.format.Computer('computer')
self.assertEqual(computer.getAddress(), {'netmask': None, 'addr': None})
def test_construct_empty(self):
computer = slapos.format.Computer('computer')
computer.construct()
raise NotImplementedError
def test_construct_empty_prepared(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct()
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft'
],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct(alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_network(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct(alter_network=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft'
],
self.fakeCallAndRead.external_command_list)
def test_construct_empty_prepared_no_alter_network_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
computer.construct(alter_network=False, alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)"],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct()
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chown('/instance_root/partition', 0, 0)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft',
'groupadd testuser',
'useradd -d /instance_root/partition -g testuser -s /bin/false -G slapsoft testuser',
'tunctl -t tap -u testuser',
'ip link set tap up',
'brctl show',
'brctl addif bridge tap',
'ip addr add ip/255.255.255.255 dev bridge',
'ip addr list bridge',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct(alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
'ip addr list bridge',
'tunctl -t tap -u root',
'ip link set tap up',
'brctl show',
'brctl addif bridge tap',
'ip addr add ip/255.255.255.255 dev bridge',
'ip addr list bridge',
'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_network(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct(alter_network=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chown('/software_root', 0, 0)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chown('/instance_root/partition', 0, 0)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
# 'ip addr list bridge',
'groupadd slapsoft',
'useradd -d /software_root -g slapsoft -s /bin/false slapsoft',
'groupadd testuser',
'useradd -d /instance_root/partition -g testuser -s /bin/false -G slapsoft testuser',
# 'ip addr add ip/255.255.255.255 dev bridge',
# 'ip addr list bridge',
# 'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
# 'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
def test_construct_prepared_no_alter_network_user(self):
computer = slapos.format.Computer('computer',
bridge=slapos.format.Bridge('bridge', '127.0.0.1/16'))
computer.instance_root = '/instance_root'
computer.software_root = '/software_root'
partition = slapos.format.Partition('partition', '/part_path',
slapos.format.User('testuser'), [], None)
partition.tap = slapos.format.Tap('tap')
computer.partition_list = [partition]
global INTERFACE_DICT
INTERFACE_DICT['bridge'] = {
2: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
'netmask': '255.255.255.0'}],
10: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
}
computer.construct(alter_network=False, alter_user=False)
self.assertEqual([
"makedirs('/instance_root', 493)",
"makedirs('/software_root', 493)",
"chmod('/software_root', 493)",
"mkdir('/instance_root/partition', 488)",
"chmod('/instance_root/partition', 488)"
],
self.test_result.bucket)
self.assertEqual([
# 'ip addr list bridge',
# 'ip addr add ip/255.255.255.255 dev bridge',
# 'ip addr list bridge',
# 'ip addr add ip/ffff:ffff:ffff:ffff:: dev bridge',
# 'ip addr list bridge',
],
self.fakeCallAndRead.external_command_list)
class TestPartition(SlapformatMixin):
def test_createPath(self):
global USER_LIST
USER_LIST = ['testuser']
self.partition.createPath()
self.assertEqual(
[
"mkdir('/part_path', 488)",
"chown('/part_path', 0, 0)",
"chmod('/part_path', 488)"
],
self.test_result.bucket
)
def test_createPath_no_alter_user(self):
self.partition.createPath(False)
self.assertEqual(
[
"mkdir('/part_path', 488)",
"chmod('/part_path', 488)"
],
self.test_result.bucket
)
class TestUser(SlapformatMixin):
def test_create(self):
user = slapos.format.User('doesnotexistsyet')
user.setPath('/doesnotexistsyet')
user.create()
self.assertEqual([
'groupadd doesnotexistsyet',
'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/false '\
'doesnotexistsyet'
],
self.fakeCallAndRead.external_command_list)
def test_create_additional_groups(self):
user = slapos.format.User('doesnotexistsyet', ['additionalgroup1',
'additionalgroup2'])
user.setPath('/doesnotexistsyet')
user.create()
self.assertEqual([
'groupadd doesnotexistsyet',
'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/false -G '\
'additionalgroup1,additionalgroup2 doesnotexistsyet'
],
self.fakeCallAndRead.external_command_list)
def test_create_group_exists(self):
global GROUP_LIST
GROUP_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'useradd -d /testuser -g testuser -s /bin/false testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_exists_additional_groups(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser', ['additionalgroup1',
'additionalgroup2'])
user.setPath('/testuser')
user.create()
self.assertEqual([
'groupadd testuser',
'usermod -d /testuser -g testuser -s /bin/false -G '\
'additionalgroup1,additionalgroup2 testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_exists(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'groupadd testuser',
'usermod -d /testuser -g testuser -s /bin/false testuser'
],
self.fakeCallAndRead.external_command_list)
def test_create_user_group_exists(self):
global USER_LIST
USER_LIST = ['testuser']
global GROUP_LIST
GROUP_LIST = ['testuser']
user = slapos.format.User('testuser')
user.setPath('/testuser')
user.create()
self.assertEqual([
'usermod -d /testuser -g testuser -s /bin/false testuser'
],
self.fakeCallAndRead.external_command_list)
def test_isAvailable(self):
global USER_LIST
USER_LIST = ['testuser']
user = slapos.format.User('testuser')
self.assertTrue(user.isAvailable())
def test_isAvailable_notAvailable(self):
user = slapos.format.User('doesnotexistsyet')
self.assertFalse(user.isAvailable())
if __name__ == '__main__':
unittest.main()
from slapos.grid import slapgrid
import httplib
import logging
import os
import shutil
import signal
import slapos.slap.slap
import socket
import tempfile
import unittest
import urlparse
import xml_marshaller
class BasicMixin:
def assertSortedListEqual(self, list1, list2, msg=None):
self.assertListEqual(sorted(list1), sorted(list2), msg)
def setUp(self):
self._tempdir = tempfile.mkdtemp()
self.software_root = os.path.join(self._tempdir, 'software')
self.instance_root = os.path.join(self._tempdir, 'instance')
if getattr(self, 'master_url', None) is None:
self.master_url = 'http://127.0.0.1:0/'
self.computer_id = 'computer'
self.supervisord_socket = os.path.join(self._tempdir, 'supervisord.sock')
self.supervisord_configuration_path = os.path.join(self._tempdir,
'supervisord')
self.usage_report_periodicity = 1
self.buildout = None
logging.basicConfig(level=logging.DEBUG)
self.grid = slapgrid.Slapgrid(self.software_root, self.instance_root,
self.master_url, self.computer_id, self.supervisord_socket,
self.supervisord_configuration_path, self.usage_report_periodicity,
self.buildout)
def tearDown(self):
shutil.rmtree(self._tempdir, True)
class TestBasicSlapgridCP(BasicMixin, unittest.TestCase):
def test_no_software_root(self):
self.assertRaises(OSError, self.grid.processComputerPartitionList)
def test_no_instance_root(self):
os.mkdir(self.software_root)
self.assertRaises(OSError, self.grid.processComputerPartitionList)
def test_no_master(self):
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
self.assertRaises(socket.error, self.grid.processComputerPartitionList)
class MasterMixin(BasicMixin):
def _patchHttplib(self):
"""Overrides httplib"""
import mock.httplib
self.saved_httplib = dict()
for fake in vars(mock.httplib):
self.saved_httplib[fake] = getattr(httplib, fake, None)
setattr(httplib, fake, getattr(mock.httplib, fake))
def _unpatchHttplib(self):
"""Restores httplib overriding"""
import httplib
for name, original_value in self.saved_httplib.items():
setattr(httplib, name, original_value)
del self.saved_httplib
def setUp(self):
self._patchHttplib()
BasicMixin.setUp(self)
def tearDown(self):
self._unpatchHttplib()
# XXX: Hardcoded pid, as it is not configurable in slapos
svc = os.path.join(self.instance_root, 'var', 'run', 'supervisord.pid')
if os.path.exists(svc):
try:
pid = int(open(svc).read().strip())
except ValueError:
pass
else:
os.kill(pid, signal.SIGTERM)
BasicMixin.tearDown(self)
class TestSlapgridCPWithMaster(MasterMixin, unittest.TestCase):
def test_nothing_to_do(self):
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse(path.lstrip('/'))
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == 'getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
slap_computer._computer_partition_list = []
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['etc', 'var'])
self.assertSortedListEqual(os.listdir(self.software_root), [])
def test_one_partition(self):
def server_response(self, path, method, body, header):
parsed_url = urlparse.urlparse('/' + path)
parsed_qs = urlparse.parse_qs(parsed_url.query)
if parsed_url.path == '/getComputerInformation' and \
'computer_id' in parsed_qs:
slap_computer = slapos.slap.Computer(parsed_qs['computer_id'])
slap_computer._software_release_list = []
partition = slapos.slap.ComputerPartition(parsed_qs['computer_id'],
'0')
partition._need_modification = True
sr = slapos.slap.SoftwareRelease()
sr._software_release = 'http://sr/'
partition._software_release_document = sr
partition._requested_state = 'stopped'
slap_computer._computer_partition_list = [partition]
return (200, {}, xml_marshaller.xml_marshaller.dumps(slap_computer))
else:
return (404, {}, '')
httplib.HTTPConnection._callback = server_response
os.mkdir(self.software_root)
os.mkdir(self.instance_root)
partition_path = os.path.join(self.instance_root, '0')
os.mkdir(partition_path, 0750)
software_hash = slapos.grid.utils.getSoftwareUrlHash('http://sr/')
srdir = os.path.join(self.software_root, software_hash)
os.mkdir(srdir)
open(os.path.join(srdir, 'template.cfg'), 'w').write(
"""[buildout]""")
srbindir = os.path.join(srdir, 'bin')
os.mkdir(srbindir)
open(os.path.join(srbindir, 'buildout'), 'w').write("""#!/bin/sh
touch worked""")
os.chmod(os.path.join(srbindir, 'buildout'), 0755)
self.assertTrue(self.grid.processComputerPartitionList())
self.assertSortedListEqual(os.listdir(self.instance_root), ['0', 'etc',
'var'])
self.assertSortedListEqual(os.listdir(self.software_root),
[software_hash])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment