Commit 98635b88 authored by Arnaud Fontaine's avatar Arnaud Fontaine

erp5.util.testbrowser: Refactor of openNoVisit() and {Image,Submit}Control for...

erp5.util.testbrowser: Refactor of openNoVisit() and {Image,Submit}Control for recent zope.testbrowser (followup of 5a0b3763).
parent 7a708aab
...@@ -36,9 +36,10 @@ import urllib ...@@ -36,9 +36,10 @@ import urllib
import Cookie import Cookie
import re import re
from urlparse import urljoin from zope.testbrowser._compat import urlparse
from z3c.etestbrowser.browser import ExtendedTestBrowser from z3c.etestbrowser.browser import ExtendedTestBrowser
from zope.testbrowser.browser import onlyOne from zope.testbrowser.browser import onlyOne
from contextlib import contextmanager
def measurementMetaClass(prefix): def measurementMetaClass(prefix):
""" """
...@@ -201,7 +202,7 @@ class Browser(ExtendedTestBrowser): ...@@ -201,7 +202,7 @@ class Browser(ExtendedTestBrowser):
# In case url_or_path is an absolute URL, urljoin() will return # In case url_or_path is an absolute URL, urljoin() will return
# it, otherwise it is a relative path and will be concatenated to # it, otherwise it is a relative path and will be concatenated to
# ERP5 base URL # ERP5 base URL
url_or_path = urljoin(self._erp5_base_url, url_or_path) url_or_path = urlparse.urljoin(self._erp5_base_url, url_or_path)
if isinstance(data, dict): if isinstance(data, dict):
data = urllib.urlencode(data) data = urllib.urlencode(data)
...@@ -209,6 +210,100 @@ class Browser(ExtendedTestBrowser): ...@@ -209,6 +210,100 @@ class Browser(ExtendedTestBrowser):
self._logger.debug("Opening: " + url_or_path) self._logger.debug("Opening: " + url_or_path)
super(Browser, self).open(url_or_path, data) super(Browser, self).open(url_or_path, data)
@contextmanager
def _preparedRequest(self, url, no_visit=False):
"""
Monkey patched for openNoVisit()
"""
from zope.testbrowser._compat import urlparse
self.timer.start()
headers = {}
if self.url:
headers['Referer'] = self.url
if self._req_content_type:
headers['Content-Type'] = self._req_content_type
headers['Connection'] = 'close'
headers['Host'] = urlparse.urlparse(url).netloc
headers['User-Agent'] = 'Python-urllib/2.4'
headers.update(self._req_headers)
extra_environ = {}
if self.handleErrors:
extra_environ['paste.throw_errors'] = None
headers['X-zope-handle-errors'] = 'True'
else:
extra_environ['wsgi.handleErrors'] = False
extra_environ['paste.throw_errors'] = True
extra_environ['x-wsgiorg.throw_errors'] = True
headers.pop('X-zope-handle-errors', None)
kwargs = {'headers': sorted(headers.items()),
'extra_environ': extra_environ,
'expect_errors': True}
yield kwargs
if not no_visit:
self._changed()
self.timer.stop()
def _processRequest(self, url, make_request, no_visit=False):
"""
Monkey patched for openNoVisit()
"""
from zope.testbrowser.browser import REDIRECTS
from zope.testbrowser._compat import urlparse
with self._preparedRequest(url, no_visit=no_visit) as reqargs:
if not no_visit:
self._history.add(self._response)
resp = make_request(reqargs)
remaining_redirects = 100 # infinite loops protection
while resp.status_int in REDIRECTS and remaining_redirects:
remaining_redirects -= 1
# BEGIN: Bugfix
location = resp.headers['location']
if '?' in location:
location_without_query_string, query_string = location.split('?')
location = (
location_without_query_string +
'?' + urllib.urlencode(urlparse.parse_qs(query_string,
strict_parsing=True),
doseq=True))
# END: Bugfix
url = urlparse.urljoin(url, location)
with self._preparedRequest(url, no_visit=no_visit) as reqargs:
resp = self.testapp.get(url, **reqargs)
assert remaining_redirects > 0, "redirects chain looks infinite"
if not no_visit:
self._setResponse(resp)
self._checkStatus()
return resp
def _absoluteUrl(self, url):
absolute = url.startswith('http://') or url.startswith('https://')
if absolute:
return str(url)
if self._response is None:
raise BrowserStateError(
"can't fetch relative reference: not viewing any document")
if not isinstance(url, unicode):
url = url.decode('utf-8')
return str(urlparse.urljoin(self._getBaseUrl(), url).encode('utf-8'))
def openNoVisit(self, url_or_path, data=None, site_relative=True): def openNoVisit(self, url_or_path, data=None, site_relative=True):
""" """
Copy/paste from zope.testbrowser.Browser.open() to allow opening an URL Copy/paste from zope.testbrowser.Browser.open() to allow opening an URL
...@@ -220,40 +315,22 @@ class Browser(ExtendedTestBrowser): ...@@ -220,40 +315,22 @@ class Browser(ExtendedTestBrowser):
# In case url_or_path is an absolute URL, urljoin() will return # In case url_or_path is an absolute URL, urljoin() will return
# it, otherwise it is a relative path and will be concatenated to # it, otherwise it is a relative path and will be concatenated to
# ERP5 base URL # ERP5 base URL
url_or_path = urljoin(self._erp5_base_url, url_or_path) url_or_path = urlparse.urljoin(self._erp5_base_url, url_or_path)
import mechanize
if isinstance(data, dict): if isinstance(data, dict):
data = urllib.urlencode(data) data = urllib.urlencode(data)
response = None url = self._absoluteUrl(url_or_path)
url_or_path = str(url_or_path) self._logger.debug("Opening: " + url)
self._logger.debug("Opening: " + url_or_path)
self._start_timer() if data is not None:
try: def make_request(args):
try: return self.testapp.post(url, data, **args)
try: else:
response = self.mech_browser.open_novisit(url_or_path, data) def make_request(args):
except Exception, e: return self.testapp.get(url, **args)
raise
except mechanize.HTTPError, e: return self._processRequest(url, make_request, no_visit=True)
if e.code >= 200 and e.code <= 299:
# 200s aren't really errors
pass
elif self.raiseHttpErrors:
raise
finally:
self._stop_timer()
# if the headers don't have a status, I suppose there can't be an error
if 'Status' in self.headers:
code, msg = self.headers['Status'].split(' ', 1)
code = int(code)
if self.raiseHttpErrors and code >= 400:
raise mechanize.HTTPError(url_or_path, code, msg, self.headers, fp=None)
return response
def randomSleep(self, minimum, maximum): def randomSleep(self, minimum, maximum):
""" """
...@@ -357,31 +434,28 @@ class Browser(ExtendedTestBrowser): ...@@ -357,31 +434,28 @@ class Browser(ExtendedTestBrowser):
elif url and '?' not in url: elif url and '?' not in url:
url += '?' url += '?'
if id is not None: from zope.testbrowser.browser import isMatching, LinkNotFoundError
def predicate(link): qa = 'a' if id is None else 'a#%s' % id
return dict(link.attrs).get('id') == id qarea = 'area' if id is None else 'area#%s' % id
args = {'predicate': predicate} html = self._html
else: links = html.select(qa)
import re links.extend(html.select(qarea))
from zope.testbrowser.browser import RegexType
if isinstance(text, RegexType): matching = []
text_regex = text for elem in links:
elif text is not None: matches = (isMatching(elem.text.encode('utf-8'), text) and
text_regex = re.compile(re.escape(text), re.DOTALL) isMatching(elem.get('href', ''), url))
else:
text_regex = None
if isinstance(url, RegexType): if matches:
url_regex = url matching.append(elem)
elif url is not None:
url_regex = re.compile(re.escape(url), re.DOTALL) if index >= len(matching):
else: raise LinkNotFoundError()
url_regex = None elem = matching[index]
args = {'text_regex': text_regex, 'url_regex': url_regex}
args['nr'] = index baseurl = self._getBaseUrl()
return LinkWithTime(self.mech_browser.find_link(**args), self)
return LinkWithTime(elem, self, baseurl)
def getImportExportLink(self): def getImportExportLink(self):
""" """
...@@ -607,8 +681,8 @@ class Browser(ExtendedTestBrowser): ...@@ -607,8 +681,8 @@ class Browser(ExtendedTestBrowser):
@rtype: int @rtype: int
""" """
self._logger.debug("Checking the number of remaining activities") self._logger.debug("Checking the number of remaining activities")
activity_counter = self.mech_browser.open_novisit( response = self.openNoVisit('portal_activities/countMessage')[1]
self._erp5_base_url + 'portal_activities/countMessage').read() activity_counter = response.body
activity_counter = activity_counter and int(activity_counter) or 0 activity_counter = activity_counter and int(activity_counter) or 0
self._logger.debug("Remaining activities: %d" % activity_counter) self._logger.debug("Remaining activities: %d" % activity_counter)
...@@ -1215,26 +1289,23 @@ class ImageControlWithTime(ImageControl): ...@@ -1215,26 +1289,23 @@ class ImageControlWithTime(ImageControl):
import zope.testbrowser.browser import zope.testbrowser.browser
browser_controlFactory = zope.testbrowser.browser.controlFactory browser_simpleControlFactory = zope.testbrowser.browser.simpleControlFactory
def controlFactory(control, *args, **kwargs): def simpleControlFactory(wtcontrol, form, elemindex, browser):
""" """
Monkey patch controlFactory in zope.testbrowser to get elapsed time on Monkey patched to get elapsed time on ImageControl and SubmitControl
ImageControl and SubmitControl
""" """
try: import webtest
t = control.type
except AttributeError: elem = elemindex[wtcontrol.pos]
# This is a subcontrol if isinstance(wtcontrol, webtest.forms.Submit):
pass if wtcontrol.attrs.get('type', 'submit') == 'image':
else: return ImageControlWithTime(wtcontrol, form, elem, browser)
if t in ('submit', 'submitbutton'): else:
return SubmitControlWithTime(control, *args, **kwargs) return SubmitControlWithTime(wtcontrol, form, elem, browser)
elif t == 'image':
return ImageControlWithTime(control, *args, **kwargs) return browser_simpleControlFactory(wtcontrol, form, elemindex, browser)
return browser_controlFactory(control, *args, **kwargs) zope.testbrowser.browser.simpleControlFactory = simpleControlFactory
zope.testbrowser.browser.controlFactory = controlFactory
from zope.testbrowser.browser import Link from zope.testbrowser.browser import Link
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment