Commit 446ecbd9 authored by Bryton Lacquement's avatar Bryton Lacquement 🚪

qa: make runUnitTest able to run functional tests with WSGI

parent 27268639
......@@ -6,7 +6,7 @@ import posixpath
import socket
from tempfile import TemporaryFile
import time
from urllib import quote
from urllib import quote, splitport
from waitress.server import create_server
import ZConfig
......@@ -73,7 +73,7 @@ class TransLogger(object):
self.logger.warn(message)
def app_wrapper(large_file_threshold, use_webdav):
def app_wrapper(large_file_threshold=10<<20, webdav_ports=()):
try:
from Products.DeadlockDebugger.dumper import dump_threads, dump_url
except Exception:
......@@ -115,7 +115,7 @@ def app_wrapper(large_file_threshold, use_webdav):
return [msg]
new_wsgi_input.seek(0)
if use_webdav:
if int(environ['SERVER_PORT']) in webdav_ports:
# Munge the request to ensure that we call manage_FTPGet.
# Set a flag to indicate this request came through the WebDAV source
......@@ -132,10 +132,22 @@ def app_wrapper(large_file_threshold, use_webdav):
return publish_module(environ, start_response)
return app
def runwsgi():
def createServer(application, logger, **kw):
global server
server = create_server(
TransLogger(application, logger=logger),
trusted_proxy='*',
trusted_proxy_headers=('x-forwarded-for',),
clear_untrusted_proxy_headers=True,
**kw
)
if not hasattr(server, 'addr'):
server.addr = server.adj.listen[0][3]
elif not server.addr:
server.addr = server.sockinfo[3]
return server
def runwsgi():
parser = argparse.ArgumentParser()
parser.add_argument('-w', '--webdav', action='store_true')
parser.add_argument('address', help='<ip>:<port>')
......@@ -148,13 +160,13 @@ def runwsgi():
make_wsgi_app({}, zope_conf=args.zope_conf)
server = create_server(
TransLogger(app_wrapper(conf.large_file_threshold, args.webdav),
logger=logging.getLogger("access")),
ip, port = splitport(args.address)
port = int(port)
createServer(
app_wrapper(
large_file_threshold=conf.large_file_threshold,
webdav_ports=[port] if args.webdav else ()),
listen=args.address,
logger=logging.getLogger("access"),
threads=conf.zserver_threads,
trusted_proxy='*',
trusted_proxy_headers=('x-forwarded-for',),
clear_untrusted_proxy_headers=True,
)
server.run()
).run()
# -*- coding: utf-8 -*-
import base64, errno, os, select, socket, sys, time
import base64, errno, logging, os, select, socket, sys, time
from threading import Thread
from UserDict import IterableUserDict
import Lifetime
......@@ -8,8 +8,8 @@ from Testing import ZopeTestCase
from ZODB.POSException import ConflictError
from zLOG import LOG, ERROR
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.ERP5Type.tests.utils import addUserToDeveloperRole
from Products.ERP5Type.tests.utils import createZServer
from Products.ERP5Type.tests.utils import \
addUserToDeveloperRole, createZServer, parseListeningAddress
from Products.CMFActivity.ActivityTool import getCurrentNode
......@@ -141,23 +141,63 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase):
if utils._Z2HOST is None:
from Products.ERP5Type.tests.runUnitTest import tests_home
log = os.path.join(tests_home, "Z2.log")
_print = lambda hs: verbose and ZopeTestCase._print(
"Running %s server at %s:%s\n" % (
hs.server_protocol, hs.server_name, hs.server_port))
try:
hs = createZServer(log)
except RuntimeError, e:
ZopeTestCase._print(str(e))
message = "Running %s server at %s:%s\n"
if int(os.environ.get('erp5_wsgi', 0)):
from Products.ERP5.bin.zopewsgi import app_wrapper, createServer
sockets = []
server_type = 'HTTP'
zserver = os.environ.get('zserver')
try:
for ip, port in parseListeningAddress(zserver):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind((ip, port))
except socket.error as e:
s.close()
if e[0] != errno.EADDRINUSE:
raise
if zserver:
raise RuntimeError(str(e))
else:
if sockets:
webdav_ports = port,
else:
webdav_ports = ()
sockets.append(s)
ZopeTestCase._print(message % (server_type, ip, port))
if webdav_ports:
break
server_type = 'WebDAV'
except RuntimeError as e:
ZopeTestCase._print('Could not start %s server: %s\n'
% (server_type, e))
if sockets:
logger = logging.getLogger("access")
logger.addHandler(logging.FileHandler(log))
logger.propagate = False
hs = createServer(app_wrapper(webdav_ports=webdav_ports),
logger, sockets=sockets)
utils._Z2HOST, utils._Z2PORT = hs.addr
t = Thread(target=hs.run)
t.setDaemon(1)
t.start()
else:
utils._Z2HOST, utils._Z2PORT = hs.server_name, hs.server_port
_print(hs)
_print = lambda hs: verbose and ZopeTestCase._print(
message % (hs.server_protocol, hs.server_name, hs.server_port))
try:
_print(createZServer(log, zserver_type='webdav'))
except RuntimeError, e:
ZopeTestCase._print('Could not start webdav zserver: %s\n' % e)
t = Thread(target=Lifetime.loop)
t.setDaemon(1)
t.start()
hs = createZServer(log)
except RuntimeError as e:
ZopeTestCase._print(str(e))
else:
utils._Z2HOST, utils._Z2PORT = hs.server_name, hs.server_port
_print(hs)
try:
_print(createZServer(log, zserver_type='webdav'))
except RuntimeError as e:
ZopeTestCase._print('Could not start webdav zserver: %s\n' % e)
t = Thread(target=Lifetime.loop)
t.setDaemon(1)
t.start()
from Products.CMFActivity import ActivityTool
# Reset, in case that getServerAddress was already called,
# in which case, the value was ('', '')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment