utils.py 5.38 KB
Newer Older
1
##############################################################################
Andreas Jung's avatar
Andreas Jung committed
2
#
3
# Copyright (c) 2005 Zope Corporation and Contributors. All Rights Reserved.
Andreas Jung's avatar
Andreas Jung committed
4
#
5 6 7 8 9 10
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
Andreas Jung's avatar
Andreas Jung committed
11
#
12 13
##############################################################################
"""Utility functions
Andreas Jung's avatar
Andreas Jung committed
14

15 16
These functions are designed to be imported and run at
module level to add functionality to the test environment.
Andreas Jung's avatar
Andreas Jung committed
17

18
$Id$
19
"""
Andreas Jung's avatar
Andreas Jung committed
20

21 22 23 24
import os
import sys
import time
import random
25 26
import transaction

27

Andreas Jung's avatar
Andreas Jung committed
28 29 30 31 32 33 34 35 36 37
def setupCoreSessions(app=None):
    '''Sets up the session_data_manager e.a.'''
    from Acquisition import aq_base
    commit = 0

    if app is None: 
        return appcall(setupCoreSessions)

    if not hasattr(app, 'temp_folder'):
        from Products.TemporaryFolder.TemporaryFolder import MountedTemporaryFolder
38
        tf = MountedTemporaryFolder('temp_folder', 'Temporary Folder')
Andreas Jung's avatar
Andreas Jung committed
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
        app._setObject('temp_folder', tf)
        commit = 1

    if not hasattr(aq_base(app.temp_folder), 'session_data'):
        from Products.Transience.Transience import TransientObjectContainer
        toc = TransientObjectContainer('session_data',
                    'Session Data Container',
                    timeout_mins=3,
                    limit=100)
        app.temp_folder._setObject('session_data', toc)
        commit = 1

    if not hasattr(app, 'browser_id_manager'):
        from Products.Sessions.BrowserIdManager import BrowserIdManager
        bid = BrowserIdManager('browser_id_manager',
                    'Browser Id Manager')
        app._setObject('browser_id_manager', bid)
        commit = 1

    if not hasattr(app, 'session_data_manager'):
        from Products.Sessions.SessionDataManager import SessionDataManager
        sdm = SessionDataManager('session_data_manager',
                    title='Session Data Manager',
                    path='/temp_folder/session_data',
                    requestName='SESSION')
        app._setObject('session_data_manager', sdm)
        commit = 1

67 68
    if commit:
        transaction.commit()
Andreas Jung's avatar
Andreas Jung committed
69 70 71 72 73 74 75 76 77 78 79


def setupZGlobals(app=None):
    '''Sets up the ZGlobals BTree required by ZClasses.'''
    if app is None: 
        return appcall(setupZGlobals)

    root = app._p_jar.root()
    if not root.has_key('ZGlobals'):
        from BTrees.OOBTree import OOBTree
        root['ZGlobals'] = OOBTree()
80
        transaction.commit()
Andreas Jung's avatar
Andreas Jung committed
81 82 83 84 85 86 87 88 89 90 91 92 93 94


def setupSiteErrorLog(app=None):
    '''Sets up the error_log object required by ZPublisher.'''
    if app is None: 
        return appcall(setupSiteErrorLog)

    if not hasattr(app, 'error_log'):
        try:
            from Products.SiteErrorLog.SiteErrorLog import SiteErrorLog
        except ImportError:
            pass
        else:
            app._setObject('error_log', SiteErrorLog())
95
            transaction.commit()
Andreas Jung's avatar
Andreas Jung committed
96 97 98 99 100 101 102 103


def importObjectFromFile(container, filename, quiet=0):
    '''Imports an object from a (.zexp) file into the given container.'''
    from ZopeLite import _print
    start = time.time()
    if not quiet: _print("Importing %s ... " % os.path.basename(filename))
    container._importObjectFromFile(filename, verify=0)
104
    transaction.commit()
Andreas Jung's avatar
Andreas Jung committed
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
    if not quiet: _print('done (%.3fs)\n' % (time.time() - start))


_Z2HOST = None
_Z2PORT = None

def startZServer(number_of_threads=1, log=None):
    '''Starts an HTTP ZServer thread.'''
    global _Z2HOST, _Z2PORT
    if _Z2HOST is None:
        _Z2HOST = '127.0.0.1'
        _Z2PORT = random.choice(range(55000, 55500))
        from ZServer import setNumberOfThreads
        setNumberOfThreads(number_of_threads)
        from threadutils import QuietThread, zserverRunner
        t = QuietThread(target=zserverRunner, args=(_Z2HOST, _Z2PORT, log))
        t.setDaemon(1)
        t.start()
123
        time.sleep(0.1) # Sandor Palfy
Andreas Jung's avatar
Andreas Jung committed
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    return _Z2HOST, _Z2PORT


def makerequest(app, stdout=sys.stdout):
    '''Wraps the app into a fresh REQUEST.'''
    from ZPublisher.BaseRequest import RequestContainer
    from ZPublisher.Request import Request
    from ZPublisher.Response import Response
    response = Response(stdout=stdout)
    environ = {}
    environ['SERVER_NAME'] = _Z2HOST or 'nohost'
    environ['SERVER_PORT'] = '%d' % (_Z2PORT or 80)
    environ['REQUEST_METHOD'] = 'GET'
    request = Request(sys.stdin, environ, response)
    request._steps = ['noobject'] # Fake a published object
    return app.__of__(RequestContainer(REQUEST=request))


def appcall(function, *args, **kw):
    '''Calls a function passing 'app' as first argument.'''
144 145
    from base import app, close
    app = app()
Andreas Jung's avatar
Andreas Jung committed
146 147 148 149
    args = (app,) + args
    try:
        return function(*args, **kw)
    finally:
150
        close(app)
Andreas Jung's avatar
Andreas Jung committed
151 152


153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
def makelist(arg):
    '''Turns arg into a list. Where arg may be
       list, tuple, or string.
    '''
    if type(arg) == type([]):
        return arg
    if type(arg) == type(()):
        return list(arg)
    if type(arg) == type(''):
       return filter(None, [arg])
    raise ValueError('Argument must be list, tuple, or string')


__all__ = [
    'setupCoreSessions',
    'setupSiteErrorLog',
    'setupZGlobals',
    'startZServer',
    'importObjectFromFile',
    'appcall',
]