Commit 48bffa97 authored by Tres Seaver's avatar Tres Seaver

  - Merge a number of entangled issues from 2.6 / 2.7 audit:

    Iteration over sequences could in some cases fail to check access
    to an object obtained from the sequence. Subsequent checks (such
    as for attributes access) of such an object would still be
    performed, but it should not have been possible to obtain the
    object in the first place.

    List and dictionary instance methods such as the get method of
    dictionary objects were not security aware and could return an
    object without checking access to that object. Subsequent checks
    (such as for attributes access) of such an object would still be
    performed, but it should not have been possible to obtain the
    object in the first place.

    Use of "import as" in Python scripts could potentially rebind
    names in ways that could be used to avoid appropriate security
    checks.

    A number of newer built-ins were either unavailable in untrusted
    code or did not perform adequate security checking.

    Unpacking via function calls, variable assignment, exception
    variables and other contexts did not perform adequate security
    checks, potentially allowing access to objects that should have
    been protected.

    Class security was not properly intialized for PythonScripts,
    potentially allowing access to variables that should be protected.
    It turned out that most of the security assertions were in fact
    activated as a side effect of other code, but this fix is still
    appropriate to ensure that all security declarations are properly
    applied.

    DTMLMethods with proxy rights could incorrectly transfer those
    rights via acquisition when traversing to a parent object.
parent dd724d52
......@@ -12,40 +12,16 @@
##############################################################################
'''Add security system support to Document Templates
$Id: DTML.py,v 1.11 2003/11/28 16:43:51 jim Exp $'''
__version__='$Revision: 1.11 $'[11:-2]
$Id: DTML.py,v 1.12 2004/01/15 23:09:03 tseaver Exp $'''
__version__='$Revision: 1.12 $'[11:-2]
from DocumentTemplate import DT_Util
import SecurityManagement, string, math, whrandom, random
import DocumentTemplate.sequence
from ZopeGuards import guarded_getattr, guarded_getitem
from ZopeGuards import safe_builtins
class RestrictedDTML:
'''
A mix-in for derivatives of DT_String.String that adds Zope security.
'''
def guarded_getattr(self, *args): # ob, name [, default]
return guarded_getattr(*args)
def guarded_getitem(self, ob, index):
return guarded_getitem(ob, index)
try:
#raise ImportError
import os
if os.environ.get("ZOPE_SECURITY_POLICY", None) == "PYTHON":
raise ImportError # :)
from cAccessControl import RestrictedDTMLMixin
except ImportError:
pass
else:
class RestrictedDTML(RestrictedDTMLMixin, RestrictedDTML):
'''
A mix-in for derivatives of DT_String.String that adds Zope security.
'''
# RestrictedDTML is inserted by AccessControl.Implementation.
# Allow access to unprotected attributes
......@@ -121,3 +97,9 @@ class DTMLSecurityAPI:
for name, v in DTMLSecurityAPI.__dict__.items():
if name[0] != '_':
setattr(DT_Util.TemplateDict, name, v)
for name, v in safe_builtins.items():
v = DT_Util.NotBindable(v)
if name.startswith('__'):
continue
setattr(DT_Util.TemplateDict, name, v)
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""C implementation of the access control machinery."""
try:
from cAccessControl import rolesForPermissionOn, \
PermissionRole, imPermissionRole, _what_not_even_god_should_do, \
RestrictedDTMLMixin, aq_validate, guarded_getattr, \
ZopeSecurityPolicy, setDefaultBehaviors
from cAccessControl import SecurityManager as cSecurityManager
except ImportError:
import sys
# make sure a partial import doesn't pollute sys.modules
del sys.modules[__name__]
from ImplPython import RestrictedDTML, SecurityManager
class RestrictedDTML(RestrictedDTMLMixin, RestrictedDTML):
"""A mix-in for derivatives of DT_String.String that adds Zope security."""
class SecurityManager(cSecurityManager, SecurityManager):
"""A security manager provides methods for checking access and managing
executable context and policies
"""
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Python implementation of the access control machinery."""
import os
import string
from Acquisition import aq_base
from ExtensionClass import Base
from zLOG import LOG, PROBLEM
# This is used when a permission maps explicitly to no permission. We
# try and get this from cAccessControl first to make sure that if both
# security implementations exist, we can switch between them later.
try:
from cAccessControl import _what_not_even_god_should_do
except ImportError:
_what_not_even_god_should_do = []
from AccessControl import SecurityManagement
from AccessControl import Unauthorized
from AccessControl.SimpleObjectPolicies import Containers, _noroles
from AccessControl.ZopeGuards import guarded_getitem
# AccessControl.PermissionRole
# ----------------------------
_ident_chars = string.ascii_letters + string.digits + "_"
name_trans = filter(lambda c, an=_ident_chars: c not in an,
map(chr, range(256)))
name_trans = string.maketrans(''.join(name_trans), '_' * len(name_trans))
_default_roles = ('Manager',)
def rolesForPermissionOn(perm, object, default=_default_roles, n=None):
"""Return the roles that have the given permission on the given object
"""
n = n or '_' + string.translate(perm, name_trans) + "_Permission"
r = None
while 1:
if hasattr(object, n):
roles = getattr(object, n)
if roles is None:
return 'Anonymous',
t = type(roles)
if t is tuple:
# If we get a tuple, then we don't acquire
if r is None:
return roles
return r+list(roles)
if t is str:
# We found roles set to a name. Start over
# with the new permission name. If the permission
# name is '', then treat as private!
if roles:
if roles != n:
n = roles
# If we find a name that is the same as the
# current name, we just ignore it.
roles = None
else:
return _what_not_even_god_should_do
elif roles:
if r is None:
r = list(roles)
else: r = r + list(roles)
object = getattr(object, 'aq_inner', None)
if object is None:
break
object = object.aq_parent
if r is None:
return default
return r
class PermissionRole(Base):
"""Implement permission-based roles.
Under normal circumstances, our __of__ method will be
called with an unwrapped object. The result will then be called
with a wrapped object, if the original object was wrapped.
To deal with this, we have to create an intermediate object.
"""
def __init__(self, name, default=('Manager',)):
self.__name__ = name
self._p = '_' + string.translate(name, name_trans) + "_Permission"
self._d = self.__roles__ = default
def __of__(self, parent):
r = imPermissionRole()
r._p = self._p
r._pa = parent
r._d = self._d
p = getattr(parent, 'aq_inner', None)
if p is not None:
return r.__of__(p)
else:
return r
def rolesForPermissionOn(self, value):
return rolesForPermissionOn(None, value, self._d, self._p)
class imPermissionRole(Base):
"""Implement permission-based roles"""
def __of__(self, value):
return rolesForPermissionOn(None, value, self._d, self._p)
rolesForPermissionOn = __of__
# The following methods are needed in the unlikely case that an unwrapped
# object is accessed:
def __getitem__(self, i):
try:
v = self._v
except:
v = self._v = self.__of__(self._pa)
del self._pa
return v[i]
def __len__(self):
try:
v = self._v
except:
v = self._v = self.__of__(self._pa)
del self._pa
return len(v)
# AccessControl.DTML
# ------------------
class RestrictedDTML:
"""A mix-in for derivatives of DT_String.String that adds Zope security."""
def guarded_getattr(self, *args): # ob, name [, default]
return guarded_getattr(*args)
def guarded_getitem(self, ob, index):
return guarded_getitem(ob, index)
# AccessControl.ZopeSecurityPolicy
# --------------------------------
#
# TODO: implement this in cAccessControl, and have Implementation
# do the indirection.
#
from AccessControl.ZopeSecurityPolicy import getRoles # XXX
class ZopeSecurityPolicy:
def __init__(self, ownerous=1, authenticated=1):
"""Create a Zope security policy.
Two optional keyword arguments may be provided:
ownerous -- Untrusted users can create code
(e.g. Python scripts or templates),
so check that code owners can access resources.
The argument must have a truth value.
The default is true.
authenticated -- Allow access to resources based on the
privaledges of the authenticated user.
The argument must have a truth value.
The default is true.
This (somewhat experimental) option can be set
to false on sites that allow only public
(unauthenticated) access. An anticipated
scenario is a ZEO configuration in which some
clients allow only public access and other
clients allow full management.
"""
self._ownerous = ownerous
self._authenticated = authenticated
def validate(self, accessed, container, name, value, context,
roles=_noroles, getattr=getattr, _noroles=_noroles,
valid_aq_=('aq_parent','aq_inner', 'aq_explicit')):
# Note: accessed is not used.
############################################################
# Provide special rules for the acquisition attributes
if isinstance(name, str):
if name.startswith('aq_') and name not in valid_aq_:
raise Unauthorized(name, value)
############################################################
# If roles weren't passed in, we'll try to get them from the object
if roles is _noroles:
roles = getRoles(container, name, value, _noroles)
############################################################
# We still might not have any roles
if roles is _noroles:
############################################################
# We have an object without roles and we didn't get a list
# of roles passed in. Presumably, the value is some simple
# object like a string or a list. We'll try to get roles
# from its container.
if container is None:
# Either container or a list of roles is required
# for ZopeSecurityPolicy to know whether access is
# allowable.
raise Unauthorized(name, value)
roles = getattr(container, '__roles__', roles)
if roles is _noroles:
# Try to acquire __roles__. If __roles__ can't be
# acquired, the value is unprotected and roles is
# left set to _noroles.
if aq_base(container) is not container:
try:
roles = container.aq_acquire('__roles__')
except AttributeError:
pass
# We need to make sure that we are allowed to
# get unprotected attributes from the container. We are
# allowed for certain simple containers and if the
# container says we can. Simple containers
# may also impose name restrictions.
p = Containers(type(container), None)
if p is None:
p = getattr(container,
'__allow_access_to_unprotected_subobjects__',
None)
if p is not None:
tp = p.__class__
if tp is not int:
if tp is dict:
if isinstance(name, basestring):
p = p.get(name)
else:
p = 1
else:
p = p(name, value)
if not p:
raise Unauthorized(name, value)
if roles is _noroles:
return 1
# We are going to need a security-aware object to pass
# to allowed(). We'll use the container.
value = container
# Short-circuit tests if we can:
try:
if roles is None or 'Anonymous' in roles:
return 1
except TypeError:
# 'roles' isn't a sequence
LOG('Zope Security Policy', PROBLEM, "'%s' passed as roles"
" during validation of '%s' is not a sequence." % (
`roles`, name))
raise
# Check executable security
stack = context.stack
if stack:
eo = stack[-1]
# If the executable had an owner, can it execute?
if self._ownerous:
owner = eo.getOwner()
if (owner is not None) and not owner.allowed(value, roles):
# We don't want someone to acquire if they can't
# get an unacquired!
raise Unauthorized(name, value)
# Proxy roles, which are a lot safer now.
proxy_roles = getattr(eo, '_proxy_roles', None)
if proxy_roles:
# Verify that the owner actually can state the proxy role
# in the context of the accessed item; users in subfolders
# should not be able to use proxy roles to access items
# above their subfolder!
owner = eo.getOwner()
# Sigh; the default userfolder doesn't return users wrapped
if owner and not hasattr(owner, 'aq_parent'):
udb = eo.getOwner(1)[0]
root = container.getPhysicalRoot()
udb = root.unrestrictedTraverse(udb)
owner = owner.__of__(udb)
if owner is not None:
if not owner._check_context(container):
# container is higher up than the owner, deny access
raise Unauthorized(name, value)
for r in proxy_roles:
if r in roles:
return 1
# Proxy roles actually limit access!
raise Unauthorized(name, value)
try:
if self._authenticated and context.user.allowed(value, roles):
return 1
except AttributeError:
pass
raise Unauthorized(name, value)
def checkPermission(self, permission, object, context):
# XXX proxy roles and executable owner are not checked
roles = rolesForPermissionOn(permission, object)
if isinstance(roles, basestring):
roles = [roles]
return context.user.allowed(object, roles)
# AccessControl.SecurityManager
# -----------------------------
# There is no corresponding control in the C implementation of the
# access control machinery (cAccessControl.c); this should probably go
# away in a future version. If you're concerned about the size of
# security stack, you probably have bigger problems.
#
try: max_stack_size = int(os.environ.get('Z_MAX_STACK_SIZE','100'))
except: max_stack_size = 100
def setDefaultBehaviors(ownerous, authenticated):
global _defaultPolicy
_defaultPolicy = ZopeSecurityPolicy(
ownerous=ownerous,
authenticated=authenticated)
setDefaultBehaviors(True, True)
class SecurityManager:
"""A security manager provides methods for checking access and managing
executable context and policies
"""
__allow_access_to_unprotected_subobjects__ = {
'validate': 1, 'checkPermission': 1,
'getUser': 1, 'calledByExecutable': 1
}
def __init__(self, thread_id, context):
self._thread_id = thread_id
self._context = context
self._policy = _defaultPolicy
def validate(self, accessed=None, container=None, name=None, value=None,
roles=_noroles):
"""Validate access.
Arguments:
accessed -- the object that was being accessed
container -- the object the value was found in
name -- The name used to access the value
value -- The value retrieved though the access.
roles -- The roles of the object if already known.
The arguments may be provided as keyword arguments. Some of these
arguments may be ommitted, however, the policy may reject access
in some cases when arguments are ommitted. It is best to provide
all the values possible.
"""
policy = self._policy
if roles is _noroles:
return policy.validate(accessed, container, name, value,
self._context)
else:
return policy.validate(accessed, container, name, value,
self._context, roles)
def DTMLValidate(self, accessed=None, container=None, name=None,
value=None, md=None):
"""Validate access.
* THIS EXISTS FOR DTML COMPATIBILITY *
Arguments:
accessed -- the object that was being accessed
container -- the object the value was found in
name -- The name used to access the value
value -- The value retrieved though the access.
md -- multidict for DTML (ignored)
The arguments may be provided as keyword arguments. Some of these
arguments may be ommitted, however, the policy may reject access
in some cases when arguments are ommitted. It is best to provide
all the values possible.
"""
policy = self._policy
return policy.validate(accessed, container, name, value, self._context)
def checkPermission(self, permission, object):
"""Check whether the security context allows the given permission on
the given object.
Arguments:
permission -- A permission name
object -- The object being accessed according to the permission
"""
policy = self._policy
return policy.checkPermission(permission, object, self._context)
def addContext(self, anExecutableObject,
getattr=getattr):
"""Add an ExecutableObject to the current security
context. Optionally, add a new SecurityPolicy as well.
"""
stack = self._context.stack
if len(stack) > max_stack_size:
raise SystemError, 'Excessive recursion'
stack.append(anExecutableObject)
p = getattr(anExecutableObject, '_customSecurityPolicy', None)
if p is not None:
p = p()
else:
p = _defaultPolicy
self._policy = p
def removeContext(self, anExecutableObject):
"""Remove an ExecutableObject, and optionally, a
SecurityPolicy, from the current security context.
"""
stack = self._context.stack
if not stack:
return
top = stack[-1]
if top is anExecutableObject:
del stack[-1]
else:
indexes = range(len(stack))
indexes.reverse()
for i in indexes:
top = stack[i]
if top is anExecutableObject:
del stack[i:]
break
else:
return
if stack:
top = stack[-1]
p = getattr(top, '_customSecurityPolicy', None)
if p is not None:
p = p()
else:
p = _defaultPolicy
self._policy = p
else:
self._policy = _defaultPolicy
def getUser(self):
"""Get the current authenticated user"""
return self._context.user
def calledByExecutable(self):
"""Return a boolean value indicating if this context was called
by an executable"""
return len(self._context.stack)
# AccessControl.ZopeGuards
# ------------------------
def aq_validate(inst, object, name, v, validate):
return validate(inst, object, name, v)
_marker = object()
def guarded_getattr(inst, name, default=_marker):
"""Retrieves an attribute, checking security in the process.
Raises Unauthorized if the attribute is found but the user is
not allowed to access the attribute.
"""
if name[:1] != '_':
# Try to get the attribute normally so that unusual
# exceptions are caught early.
try: v = getattr(inst, name)
except AttributeError:
if default is not _marker:
return default
raise
assertion = Containers(type(inst))
if isinstance(assertion, dict):
# We got a table that lets us reason about individual
# attrs
assertion = assertion.get(name)
if assertion:
# There's an entry, but it may be a function.
if callable(assertion):
return assertion(inst, name)
# Nope, it's boolean
return v
raise Unauthorized, name
elif assertion:
# So the entry in the outer table is not a dict
# It's allowed to be a vetoing function:
if callable(assertion):
assertion(name, v)
# No veto, so we can return
return v
validate = SecurityManagement.getSecurityManager().validate
# Filter out the objects we can't access.
if hasattr(inst, 'aq_acquire'):
return inst.aq_acquire(name, aq_validate, validate)
# Or just try to get the attribute directly.
if validate(inst, inst, name, v):
return v
raise Unauthorized, name
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Controller that can switch between security machinery implementations.
This module allows configuration of the security implementation after
the initial import of the modules. It is intended to allow runtime
selection of the machinery based on Zope's configuration file.
The helper function defined here switches between the 'C' and 'PYTHON'
security implementations by loading the appropriate implementation
module and wiring the implementation into the other modules of the
AccessControl package that defined the various components before this
module was introduced.
"""
def getImplementationName():
"""Return the name of the implementation currently being used."""
return _implementation_name
def setImplementation(name):
"""Select the policy implementation to use.
'name' must be either 'PYTHON' or 'C'.
XXX: 'C' version is currently broken
"""
import sys
global _implementation_name
#
name = name.upper()
if name == _implementation_name:
return
if name == "C":
raise NotImplementedError( # XXX
"'C' version of ZSP not yet working.")
try:
from AccessControl import ImplC as impl
except ImportError:
name = "PYTHON"
from AccessControl import ImplPython as impl
elif name == "PYTHON":
from AccessControl import ImplPython as impl
else:
raise ValueError("unknown policy implementation: %r" % name)
#
_implementation_name = name
for modname, names in _policy_names.items():
__import__(modname)
mod = sys.modules[modname]
for n in names:
setattr(mod, n, getattr(impl, n))
if hasattr(mod, "initialize"):
mod.initialize(impl)
_implementation_name = None
_policy_names = {
"AccessControl": ("setDefaultBehaviors",
),
"AccessControl.DTML": ("RestrictedDTML",
),
"AccessControl.PermissionRole": ("_what_not_even_god_should_do",
"rolesForPermissionOn",
"PermissionRole",
"imPermissionRole",
),
"AccessControl.SecurityManagement": ("SecurityManager",
),
"AccessControl.SecurityManager": ("SecurityManager",
),
"AccessControl.ZopeGuards": ("aq_validate",
"guarded_getattr",
),
"AccessControl.ZopeSecurityPolicy": ("ZopeSecurityPolicy",
),
}
# start with the default, mostly because we need something for the tests
#setImplementation("C") XXX: C version of ZSP isn't yet working
setImplementation("PYTHON")
......@@ -10,200 +10,11 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''Objects that implement Permission-based roles.
'''Objects that implement Permission-based roles.
$Id: PermissionRole.py,v 1.20 2003/11/28 16:43:53 jim Exp $'''
__version__='$Revision: 1.20 $'[11:-2]
_use_python_impl = 0
import os
if os.environ.get("ZOPE_SECURITY_POLICY", None) == "PYTHON":
_use_python_impl = 1
else:
try:
# C Optimization:
from cAccessControl import rolesForPermissionOn, \
PermissionRole, imPermissionRole, _what_not_even_god_should_do
except ImportError:
# Fall back to Python implementation.
_use_python_impl = 1
if 1 or _use_python_impl:
import sys
from ExtensionClass import Base
import string
name_trans=filter((lambda c, an=string.letters+string.digits+'_':
c not in an
),
map(chr,range(256)))
name_trans=string.maketrans(''.join(name_trans), '_'*len(name_trans))
def rolesForPermissionOn(perm, obj, default=('Manager',), n=None):
"""Return the roles that have the given permission on the given object
"""
n = n or '_'+string.translate(perm, name_trans)+"_Permission"
r = None
while 1:
if hasattr(obj, n):
roles = getattr(obj, n)
if roles is None:
return 'Anonymous',
t = type(roles)
if t is tuple:
# If we get a tuple, then we don't acquire
if r is None:
return roles
return r+list(roles)
if t is str:
# We found roles set to a name. Start over
# with the new permission name. If the permission
# name is '', then treat as private!
if roles:
if roles != n:
n = roles
# If we find a name that is the same as the
# current name, we just ignore it.
roles = None
else:
return _what_not_even_god_should_do
elif roles:
if r is None:
r = list(roles)
else: r = r + list(roles)
obj = getattr(obj, 'aq_inner', None)
if obj is None:
break
obj = obj.aq_parent
if r is None:
return default
return r
class PermissionRole(Base):
"""Implement permission-based roles.
Under normal circumstances, our __of__ method will be
called with an unwrapped object. The result will then be called
with a wrapped object, if the original object was wrapped.
To deal with this, we have to create an intermediate object.
"""
def __init__(self, name, default=('Manager',)):
self.__name__=name
self._p='_'+string.translate(name,name_trans)+"_Permission"
self._d = self.__roles__ = default
def __of__(self, parent, getattr=getattr):
r=imPermissionRole()
r._p=self._p
r._pa=parent
r._d=self._d
p=getattr(parent, 'aq_inner', None)
if p is not None:
return r.__of__(p)
else:
return r
def rolesForPermissionOn(self, value):
return rolesForPermissionOn(None, value, self._d, self._p)
# This is used when a permission maps explicitly to no permission.
_what_not_even_god_should_do=[]
class imPermissionRole(Base):
"""Implement permission-based roles
"""
def __of__(self, value):
return rolesForPermissionOn(None, value, self._d, self._p)
rolesForPermissionOn = __of__
# The following methods are needed in the unlikely case that
# an unwrapped object is accessed:
def __getitem__(self, i):
try:
v=self._v
except:
v=self._v=self.__of__(self._pa)
del self._pa
return v[i]
def __len__(self):
try:
v=self._v
except:
v=self._v=self.__of__(self._pa)
del self._pa
return len(v)
##############################################################################
# Test functions:
$Id: PermissionRole.py,v 1.21 2004/01/15 23:09:03 tseaver Exp $'''
# The following names are inserted by AccessControl.Implementation:
#
def main():
# The "main" program for this module
import sys
sys.path.append('/projects/_/ExtensionClass')
from Acquisition import Implicit
class I(Implicit):
x__roles__=PermissionRole('x')
y__roles__=PermissionRole('y')
z__roles__=PermissionRole('z')
def x(self): pass
def y(self): pass
def z(self): pass
a=I()
a.b=I()
a.b.c=I()
a.q=I()
a.q._x_Permission=('foo',)
a._y_Permission=('bar',)
a._z_Permission=('zee',)
a.b.c._y_Permission=('Manage',)
a.b._z_Permission=['also']
print a.x.__roles__, list(a.x.__roles__)
print a.b.x.__roles__
print a.b.c.x.__roles__
print a.q.x.__roles__
print a.b.q.x.__roles__
print a.b.c.q.x.__roles__
print
print a.y.__roles__, list(a.y.__roles__)
print a.b.y.__roles__
print a.b.c.y.__roles__
print a.q.y.__roles__
print a.b.q.y.__roles__
print a.b.c.q.y.__roles__
print
print a.z.__roles__, list(a.z.__roles__)
print a.b.z.__roles__
print a.b.c.z.__roles__
print a.q.z.__roles__
print a.b.q.z.__roles__
print a.b.c.q.z.__roles__
print
# rolesForPermissionOn, PermissionRole, imPermissionRole,
# _what_not_even_god_should_do
......@@ -10,9 +10,9 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Constant definitions for built-in Zope permissions"""
"""Constant definitions for built-in Zope permissions
__version__='$Revision: 1.7 $'[11:-2]
$Id: Permissions.py,v 1.8 2004/01/15 23:09:03 tseaver Exp $"""
access_contents_information='Access contents information'
......
......@@ -12,7 +12,7 @@
##############################################################################
"""Access control support"""
__version__='$Revision: 1.58 $'[11:-2]
__version__='$Revision: 1.59 $'[11:-2]
from Globals import DTMLFile, MessageDialog, Dictionary
......@@ -210,7 +210,7 @@ class RoleManager(ExtensionClass.Base, PermissionMapping.RoleManager):
if fails:
return MessageDialog(title="Warning!",
message="Some permissions had errors: "
+ ', '.join(fails),
+ escape(', '.join(fails)),
action='manage_access')
return MessageDialog(
title ='Success!',
......
......@@ -38,17 +38,18 @@
"""
__version__='$Revision$'[11:-2]
import sys
import Acquisition
import Acquisition, PermissionRole, sys
from AccessControl.ImplPython import _what_not_even_god_should_do
from zLOG import LOG, WARNING
# Security constants - these are imported into the AccessControl
# namespace and can be referenced as AccessControl.PUBLIC etc.
ACCESS_NONE = PermissionRole._what_not_even_god_should_do
ACCESS_NONE = _what_not_even_god_should_do
ACCESS_PRIVATE = ()
ACCESS_PUBLIC = None
......
......@@ -13,8 +13,8 @@
__doc__='''short description
$Id: SecurityManagement.py,v 1.8 2003/05/14 21:51:23 shane Exp $'''
__version__='$Revision: 1.8 $'[11:-2]
$Id: SecurityManagement.py,v 1.9 2004/01/15 23:09:03 tseaver Exp $'''
__version__='$Revision: 1.9 $'[11:-2]
def getSecurityManager():
"""Get a security manager, for the current thread.
......@@ -33,10 +33,14 @@ def getSecurityManager():
return manager
import SpecialUsers
from SecurityManager import SecurityManager
try: import thread
except: get_ident=lambda: 0
else: get_ident=thread.get_ident
# AccessControl.Implementation inserts SecurityManager.
try:
from thread import get_ident
except ImportError:
def get_ident():
return 0
_managers={}
......
......@@ -10,192 +10,24 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''short description
'''API module to set the security policy
$Id: SecurityManager.py,v 1.14 2003/11/28 16:44:03 jim Exp $'''
__version__='$Revision: 1.14 $'[11:-2]
$Id: SecurityManager.py,v 1.15 2004/01/15 23:09:03 tseaver Exp $'''
import ZopeSecurityPolicy, os
from AccessControl import ImplPython as _ImplPython
from AccessControl.SimpleObjectPolicies import _noroles
_noroles = ZopeSecurityPolicy._noroles
try: max_stack_size=int(os.environ.get('Z_MAX_STACK_SIZE','100'))
except: max_stack_size=100
if os.environ.has_key("ZSP_OWNEROUS_SKIP"): ownerous=0
else: ownerous=1
if os.environ.has_key("ZSP_AUTHENTICATION_SKIP"): authenticated=0
else: authenticated=1
_defaultPolicy=ZopeSecurityPolicy.ZopeSecurityPolicy(ownerous=ownerous,
authenticated=authenticated)
def setSecurityPolicy(aSecurityPolicy):
"""Set the system default security policy.
This method should only be caused by system startup code. It should
never, for example, be called during a web request.
"""
global _defaultPolicy
last=_defaultPolicy
_defaultPolicy=aSecurityPolicy
last = _ImplPython._defaultPolicy
_ImplPython._defaultPolicy = aSecurityPolicy
return last
class SecurityManager:
"""A security manager provides methods for checking access and managing
executable context and policies
"""
__allow_access_to_unprotected_subobjects__ = {
'validate': 1, 'checkPermission': 1,
'getUser': 1, 'calledByExecutable': 1
}
def __init__(self, thread_id, context):
self._thread_id=thread_id
self._context=context
self._policy=_defaultPolicy
def validate(self, accessed=None, container=None, name=None, value=None,
roles=_noroles):
"""Validate access.
Arguments:
accessed -- the object that was being accessed
container -- the object the value was found in
name -- The name used to access the value
value -- The value retrieved though the access.
roles -- The roles of the object if already known.
The arguments may be provided as keyword arguments. Some of these
arguments may be ommitted, however, the policy may reject access
in some cases when arguments are ommitted. It is best to provide
all the values possible.
"""
policy=self._policy
if roles is _noroles:
return policy.validate(accessed, container, name, value,
self._context)
else:
return policy.validate(accessed, container, name, value,
self._context, roles)
def DTMLValidate(self, accessed=None, container=None, name=None,
value=None, md=None):
"""Validate access.
* THIS EXISTS FOR DTML COMPATIBILITY *
Arguments:
accessed -- the object that was being accessed
container -- the object the value was found in
name -- The name used to access the value
value -- The value retrieved though the access.
md -- multidict for DTML (ignored)
The arguments may be provided as keyword arguments. Some of these
arguments may be ommitted, however, the policy may reject access
in some cases when arguments are ommitted. It is best to provide
all the values possible.
"""
policy=self._policy
return policy.validate(accessed, container, name, value,
self._context)
def checkPermission(self, permission, object):
"""Check whether the security context allows the given permission on
the given object.
Arguments:
permission -- A permission name
object -- The object being accessed according to the permission
"""
policy=self._policy
return policy.checkPermission(permission, object,
self._context)
def addContext(self, anExecutableObject,
getattr=getattr):
"""Add an ExecutableObject to the current security
context. Optionally, add a new SecurityPolicy as well.
"""
stack=self._context.stack
if len(stack) > max_stack_size:
raise SystemError, 'Excessive recursion'
stack.append(anExecutableObject)
p=getattr(anExecutableObject, '_customSecurityPolicy', None)
if p is not None:
p=p()
else:
p=_defaultPolicy
self._policy=p
def removeContext(self, anExecutableObject,
getattr=getattr):
"""Remove an ExecutableObject, and optionally, a
SecurityPolicy, from the current security context.
"""
stack=self._context.stack
if not stack: return
top=stack[-1]
if top is anExecutableObject:
del stack[-1]
else:
indexes=range(len(stack))
indexes.reverse()
for i in indexes:
top=stack[i]
if top is anExecutableObject:
del stack[i:]
break
else:
return
if stack:
top=stack[-1]
p=getattr(top, '_customSecurityPolicy', None)
if p is not None:
p=p()
else:
p=_defaultPolicy
self._policy=p
else:
self._policy=_defaultPolicy
def getUser(self):
"""Get the current authenticated user"""
return self._context.user
def calledByExecutable(self):
"""Return a boolean value indicating if this context was called
by an executable"""
return len(self._context.stack)
try:
#raise ImportError # uncomment to disable C optimization
import os
if os.environ.get("ZOPE_SECURITY_POLICY", None) == "PYTHON":
raise ImportError # :)
from cAccessControl import SecurityManager as cSecurityManager
except ImportError:
pass
else:
class SecurityManager(cSecurityManager, SecurityManager):
"""A security manager provides methods for checking access and managing
executable context and policies
"""
# AccessControl.Implementation inserts SecurityManager.
......@@ -10,22 +10,69 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''Collect rules for access to objects that don\'t have roles.
'''Collect rules for access to objects that don\'t have roles.
$Id: SimpleObjectPolicies.py,v 1.12 2002/08/14 21:29:07 mj Exp $'''
__version__='$Revision: 1.12 $'[11:-2]
The rules are expressed as a mapping from type -> assertion
_noroles=[] # this is imported from various places
An assertion can be:
- A dict
- A callable
- Something with a truth value
If the assertion is a callable, then it will be called with
a name being accessed and the name used. Its return value is ignored,
but in may veto an access by raising an exception.
If the assertion is a dictionary, then the keys are attribute names.
The values may be callables or objects with boolean values. If a value
is callable, it will be called with the object we are accessing an
attribute of and the attribute name. It should return an attribute
value. Callables are often used to returned guarded versions of
methods. Otherwise, accesses are allowed if values in this dictionary
are true and disallowed if the values are false or if an item for an
attribute name is not present.
If the assertion is not a dict and is not callable, then access to
unprotected attributes is allowed if the assertion is true, and
disallowed otherwise.
XXX This descrition doesn't actually match what's done in ZopeGuards
or in ZopeSecurityPolicy. :(
$Id: SimpleObjectPolicies.py,v 1.13 2004/01/15 23:09:03 tseaver Exp $'''
_noroles = [] # this is imported in various places
import Record
# Allow access to unprotected attributes
Record.Record.__allow_access_to_unprotected_subobjects__=1
# ContainerAssertions are used by cAccessControl to check access to
# attributes of container types, like dict, list, or string.
# ContainerAssertions maps types to a either a dict, a function, or a
# simple boolean value. When guarded_getattr checks the type of its
# first argument against ContainerAssertions, and invokes checking
# logic depending on what value it finds.
# If the value for a type is:
# - a boolean value:
# - the value determines whether access is allowed
# - a function (or callable):
# - The function is called with the name of the attribute and
# the actual attribute value, then the value is returned.
# The function can raise an exception.
# - a dict:
# - The dict maps attribute names to boolean values or functions.
# The boolean values behave as above, but the functions do not.
# The value returned for attribute access is the result of
# calling the function with the object and the attribute name.
ContainerAssertions={
type(()): 1,
type([]): 1,
type({}): 1,
type(''): 1,
type(u''): 1,
}
......@@ -45,16 +92,15 @@ except:
# What to do?
pass
Containers=ContainerAssertions.get
Containers = ContainerAssertions.get
from types import IntType, DictType, TypeType
def allow_type(Type, allowed=1):
"""Allow a type and all of its methods and attributes to be used from
restricted code. The argument Type must be a type."""
if type(Type) is not TypeType:
if type(Type) is not type:
raise ValueError, "%s is not a type" % `Type`
if hasattr(Type, '__roles__'):
raise ValueError, "%s handles its own security" % `Type`
if not (isinstance(allowed, IntType) or isinstance(allowed, DictType)):
if not (isinstance(allowed, int) or isinstance(allowed, dict)):
raise ValueError, "The 'allowed' argument must be an int or dict."
ContainerAssertions[Type] = allowed
......@@ -11,14 +11,16 @@
#
##############################################################################
__version__='$Revision: 1.16 $'[11:-2]
__version__='$Revision: 1.17 $'[11:-2]
from RestrictedPython.Guards import safe_builtins, _full_read_guard, \
full_write_guard
import sys
import RestrictedPython
from RestrictedPython.Guards import safe_builtins, full_write_guard
from RestrictedPython.Utilities import utility_builtins
from SecurityManagement import getSecurityManager
from SecurityInfo import secureModule
from SimpleObjectPolicies import Containers
from SimpleObjectPolicies import Containers, ContainerAssertions
from zExceptions import Unauthorized
_marker = [] # Create a new marker object.
......@@ -26,47 +28,16 @@ _marker = [] # Create a new marker object.
safe_builtins = safe_builtins.copy()
safe_builtins.update(utility_builtins)
try:
#raise ImportError
import os
if os.environ.get("ZOPE_SECURITY_POLICY", None) == "PYTHON":
raise ImportError # :)
from cAccessControl import aq_validate, guarded_getattr
except ImportError:
def aq_validate(inst, obj, name, v, validate):
return validate(inst, obj, name, v)
# AccessControl.Implementation inserts these names into this module as
# module globals: aq_validate, guarded_getattr
def initialize(impl):
# Called by AccessControl.Implementation.setImplementation()
# whenever the selected implementation changes.
global guarded_getattr
guarded_getattr = impl.guarded_getattr
safe_builtins['getattr'] = guarded_getattr
def guarded_getattr(inst, name, default=_marker):
"""Retrieves an attribute, checking security in the process.
Raises Unauthorized if the attribute is found but the user is
not allowed to access the attribute.
"""
if name[:1] != '_':
# Try to get the attribute normally so that unusual
# exceptions are caught early.
try: v = getattr(inst, name)
except AttributeError:
if default is not _marker:
return default
raise
if Containers(type(inst)):
# Simple type. Short circuit.
return v
validate = getSecurityManager().validate
# Filter out the objects we can't access.
if hasattr(inst, 'aq_acquire'):
return inst.aq_acquire(name, aq_validate, validate)
# Or just try to get the attribute directly.
if validate(inst, inst, name, v):
return v
raise Unauthorized, name
safe_builtins['getattr'] = guarded_getattr
def guarded_hasattr(object, name):
try:
......@@ -96,14 +67,167 @@ def guarded_getitem(object, index):
if Containers(type(object)) and Containers(type(v)):
# Simple type. Short circuit.
return v
if getSecurityManager().validate(object, object, index, v):
if getSecurityManager().validate(object, object, None, v):
return v
raise Unauthorized, 'unauthorized access to element %s' % `i`
if sys.version_info < (2, 2):
# Can't use nested scopes, so we create callable instances
class get_dict_get:
def __init__(self, d, name):
self.d = d
def __call__(self, key, default=None):
try:
return guarded_getitem(self.d, key)
except KeyError:
return default
class get_dict_pop:
def __init__(self, d, name):
self.d = d
def __call__(self, key, default=_marker):
try:
v = guarded_getitem(self.d, key)
except KeyError:
if default is not _marker:
return default
raise
else:
del self.d[key]
return v
# Dict methods not in Python 2.1
get_iter = 0
class get_list_pop:
def __init__(self, lst, name):
self.lst = lst
def __call__(self, index=-1):
# XXX This is not thread safe, but we don't expect
# XXX thread interactions between python scripts <wink>
v = guarded_getitem(self.lst, index)
del self.lst[index]
return v
else:
# Python 2.2 or better: Create functions using nested scope to store state
# This is less expensive then instantiating and calling instances
def get_dict_get(d, name):
def guarded_get(key, default=None):
try:
return guarded_getitem(d, key)
except KeyError:
return default
return guarded_get
def get_dict_pop(d, name):
def guarded_pop(key, default=_marker):
try:
v = guarded_getitem(d, key)
except KeyError:
if default is not _marker:
return default
raise
else:
del d[key]
return v
return guarded_pop
def get_iter(c, name):
iter = getattr(c, name)
def guarded_iter():
return SafeIter(iter(), c)
return guarded_iter
def get_list_pop(lst, name):
def guarded_pop(index=-1):
# XXX This is not thread safe, but we don't expect
# XXX thread interactions between python scripts <wink>
v = guarded_getitem(lst, index)
del lst[index]
return v
return guarded_pop
# See comment in SimpleObjectPolicies for an explanation of what the
# dicts below actually mean.
ContainerAssertions[type({})] = {
'clear':1, 'copy':1, 'fromkeys':1, 'get':get_dict_get, 'has_key':1,
'items':1, 'iteritems':1, 'keys':1,
'iterkeys': get_iter, 'itervalues':get_iter,
'pop':get_dict_pop, 'popitem':1, 'setdefault':1, 'update':1, 'values':1}
full_read_guard = _full_read_guard(guarded_getattr, guarded_getitem)
ContainerAssertions[type([])] = {
'append':1, 'count':1, 'extend':1, 'index':1, 'insert':1,
'pop':get_list_pop, 'remove':1, 'reverse':1, 'sort':1}
# This implementation of a "safe" iterator uses a global guard()
# function to implement the actual guard. This check is defined as a
# global so that it can delay imports of some module to avoid circular
# dependencies while also making it possible to use a faster
# implementation once the imports are done (by avoiding the import
# machinery on subsequent calls). Use of a method on the SafeIter
# class is avoided to ensure the best performance of the resulting
# function.
if sys.version_info < (2, 2):
class SafeIter:
def __init__(self, sequence, container=None):
if container is None:
container = sequence
self.container = container
self.sequence = sequenece
self.next_index = 0
def __getitem__(self, index):
ob = self.sequence[self.next_index]
self.next_index += 1
guard(self.container, ob, self.next_index - 1)
return ob
def _error(index):
raise Unauthorized, 'unauthorized access to element %s' % `index`
else:
class SafeIter(object):
#__slots__ = '_next', 'container'
def __init__(self, ob, container=None):
self._next = iter(ob).next
if container is None:
container = ob
self.container = container
def __iter__(self):
return self
def next(self):
ob = self._next()
guard(self.container, ob)
return ob
def _error(index):
raise Unauthorized, 'unauthorized access to element'
safe_builtins['iter'] = SafeIter
def guard(container, value, index=None):
if Containers(type(container)) and Containers(type(value)):
# Simple type. Short circuit.
return
if getSecurityManager().validate(container, container, index, value):
return
_error(index)
# More replacement built-ins.
def guarded_filter(f, seq, skip_unauthorized=0):
if type(seq) is type(''):
return filter(f, seq)
......@@ -120,6 +244,27 @@ def guarded_filter(f, seq, skip_unauthorized=0):
return result
safe_builtins['filter'] = guarded_filter
def guarded_reduce(f, seq, initial=_marker):
if initial is _marker:
return reduce(f, SafeIter(seq))
else:
return reduce(f, SafeIter(seq), initial)
safe_builtins['reduce'] = guarded_reduce
def guarded_max(item, *items):
if items:
item = [item]
item.extend(items)
return max(SafeIter(item))
safe_builtins['max'] = guarded_max
def guarded_min(item, *items):
if items:
item = [item]
item.extend(items)
return min(SafeIter(item))
safe_builtins['min'] = guarded_min
def guarded_map(f, *seqs):
safe_seqs = []
for seqno in range(len(seqs)):
......@@ -128,7 +273,6 @@ def guarded_map(f, *seqs):
return map(f, *safe_seqs)
safe_builtins['map'] = guarded_map
import sys
def guarded_import(mname, globals={}, locals={}, fromlist=None):
mnameparts = mname.split('.')
firstmname = mnameparts[0]
......@@ -156,6 +300,31 @@ def guarded_import(mname, globals={}, locals={}, fromlist=None):
raise ImportError, 'import of "%s" is unauthorized' % mname
safe_builtins['__import__'] = guarded_import
class GuardedListType:
def __call__(self, *args, **kwargs):
return list(*args, **kwargs)
if sys.version_info >= (2, 4):
def sorted(self, iterable, cmp=None, key=None, reverse=False):
return list.sorted(iterable, cmp=None, key=None, reverse=False)
safe_builtins['list'] = GuardedListType()
class GuardedDictType:
def __call__(self, *args, **kwargs):
return dict(*args, **kwargs)
def fromkeys(self, S, v=None):
return dict.fromkeys(S,v)
safe_builtins['dict'] = GuardedDictType()
def guarded_enumerate(seq):
return enumerate(SafeIter(seq))
safe_builtins['enumerate'] = guarded_enumerate
def guarded_sum(sequence, start=0):
return sum(SafeIter(sequence), start)
safe_builtins['sum'] = guarded_sum
def load_module(module, mname, mnameparts, validate, globals, locals):
modules = sys.modules
while mnameparts:
......@@ -175,3 +344,65 @@ def load_module(module, mname, mnameparts, validate, globals, locals):
return
module = nextmodule
return module
# This version of apply is used by restricted Python, which transforms
# extended call syntax into a call of _apply_(), after tucking the callable
# into the first element of args. For example,
# f(3, eggs=1, spam=False)
# is changed to
# _apply_(f, 3, eggs=1, spam=False)
def guarded_apply(func, *args, **kws):
return builtin_guarded_apply(func, args, kws)
# This version is the safe_builtins apply() replacement, so needs to match the
# signature of __builtin__.apply.
def builtin_guarded_apply(func, args=(), kws={}):
# Check the args elements. args may be an arbitrary iterable, and
# iterating over it may consume it, so we also need to save away
# the arguments in a new list to pass on to the real apply().
i, arglist = 0, []
for elt in args:
guard(args, elt, i)
arglist.append(elt)
i += 1
# Check kws similarly. Checking the keys may not be strictly necessary,
# but better safe than sorry. A new argument dict is created just in
# case kws is a hostile user-defined instance that may do horrid things
# as a side-effect of calling items().
argdict = {}
for k, v in kws.items():
guard(kws, k)
guard(kws, v, k)
argdict[k] = v
return func(*arglist, **argdict)
safe_builtins['apply'] = builtin_guarded_apply
# AccessControl clients generally need to set up a safe globals dict for
# use by restricted code. The get_safe_globals() function returns such
# a dict, containing '__builtins__' mapped to our safe bulitins, and
# bindings for all the special functions inserted into Python code by
# RestrictionMutator transformations. A client may wish to add more
# bindings to this dict. It's generally safe to do so, as
# get_safe_globals returns a (shallow) copy of a canonical safe globals
# dict.
# Exception: For obscure technical reasons, clients have to import
# guarded_getattr from this module (ZopeGuards) and plug it into the
# dict themselves, with key '_getattr_'.
_safe_globals = {'__builtins__': safe_builtins,
'_apply_': guarded_apply,
'_getitem_': guarded_getitem,
'_getiter_': SafeIter,
'_print_': RestrictedPython.PrintCollector,
'_write_': full_write_guard,
# The correct implementation of _getattr_, aka
# guarded_getattr, isn't known until
# AccessControl.Implementation figures that out, then
# stuffs it into *this* module's globals bound to
# 'guarded_getattr'. We can't know what that is at
## '_getattr_': guarded_getattr,
}
get_safe_globals = _safe_globals.copy
......@@ -10,215 +10,45 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__='''Define Zope\'s default security policy
"""Define Zope's default security policy
$Id: ZopeSecurityPolicy.py,v 1.26 2004/01/15 23:09:03 tseaver Exp $"""
$Id: ZopeSecurityPolicy.py,v 1.25 2003/11/28 16:44:06 jim Exp $'''
__version__='$Revision: 1.25 $'[11:-2]
# AccessControl.Implementation inserts ZopeSecurityPolicy, getRoles
from AccessControl.SimpleObjectPolicies import _noroles
_use_python_impl = 0
import os
if os.environ.get("ZOPE_SECURITY_POLICY", None) == "PYTHON":
_use_python_impl = 1
else:
try:
# C Optimization:
from cAccessControl import ZopeSecurityPolicy
from SimpleObjectPolicies import _noroles
except ImportError:
# Fall back to Python implementation.
_use_python_impl = 1
rolesForPermissionOn = None # XXX: avoid import loop
tuple_or_list = tuple, list
if 1 or _use_python_impl:
def getRoles(container, name, value, default):
from types import StringType, UnicodeType
global rolesForPermissionOn # XXX: avoid import loop
import SimpleObjectPolicies
from AccessControl import Unauthorized
_noroles=SimpleObjectPolicies._noroles
from zLOG import LOG, PROBLEM
from Acquisition import aq_base
if rolesForPermissionOn is None:
from PermissionRole import rolesForPermissionOn
from PermissionRole import _what_not_even_god_should_do, \
rolesForPermissionOn
roles = getattr(value, '__roles__', _noroles)
if roles is _noroles:
if not name or not isinstance(name, basestring):
return default
tuple_or_list = tuple, list
def getRoles(container, name, value, default):
roles = getattr(value, '__roles__', _noroles)
cls = getattr(container, '__class__', None)
if cls is None:
return default
roles = getattr(cls, name+'__roles__', _noroles)
if roles is _noroles:
if not name or not isinstance(name, basestring):
return default
cls = getattr(container, '__class__', None)
if cls is None:
return default
roles = getattr(cls, name+'__roles__', _noroles)
if roles is _noroles:
return default
return default
value = container
if roles is None or isinstance(roles, tuple_or_list):
return roles
rolesForPermissionOn = getattr(roles, 'rolesForPermissionOn', None)
if rolesForPermissionOn is not None:
roles = rolesForPermissionOn(value)
value = container
if roles is None or isinstance(roles, tuple_or_list):
return roles
class ZopeSecurityPolicy:
def __init__(self, ownerous=1, authenticated=1):
"""Create a Zope security policy.
Two optional keyword arguments may be provided:
ownerous -- Untrusted users can create code
(e.g. Python scripts or templates),
so check that code owners can access resources.
The argument must have a truth value.
The default is true.
authenticated -- Allow access to resources based on the
privaledges of the authenticated user.
The argument must have a truth value.
The default is true.
This (somewhat experimental) option can be set
to false on sites that allow only public
(unauthenticated) access. An anticipated
scenario is a ZEO configuration in which some
clients allow only public access and other
clients allow full management.
"""
self._ownerous=ownerous
self._authenticated=authenticated
def validate(self, accessed, container, name, value, context,
roles=_noroles, type=type, IntType=type(0),
DictType=type({}), getattr=getattr, _noroles=_noroles,
StringType=type(''),
Containers=SimpleObjectPolicies.Containers,
valid_aq_=('aq_parent','aq_inner', 'aq_explicit')):
# Note: accessed is not used.
############################################################
# Provide special rules for the acquisition attributes
if type(name) is StringType:
if name.startswith('aq_') and name not in valid_aq_:
raise Unauthorized(name, value)
############################################################
# If roles weren't passed in, we'll try to get them from the object
if roles is _noroles:
roles = getRoles(container, name, value, _noroles)
############################################################
# We still might not have any roles
if roles is _noroles:
############################################################
# We have an object without roles and we didn't get a list
# of roles passed in. Presumably, the value is some simple
# object like a string or a list. We'll try to get roles
# from its container.
if container is None:
# Either container or a list of roles is required
# for ZopeSecurityPolicy to know whether access is
# allowable.
raise Unauthorized(name, value)
roles=getattr(container, '__roles__', _noroles)
if roles is _noroles:
# Try to acquire __roles__. If __roles__ can't be
# acquired, the value is unprotected and roles is
# left set to _noroles.
if aq_base(container) is not container:
try:
roles = container.aq_acquire('__roles__')
except AttributeError:
pass
# We need to make sure that we are allowed to
# get unprotected attributes from the container. We are
# allowed for certain simple containers and if the
# container says we can. Simple containers
# may also impose name restrictions.
p=Containers(type(container), None)
if p is None:
p=getattr(container,
'__allow_access_to_unprotected_subobjects__', None)
if p is not None:
tp=type(p)
if tp is not IntType:
if tp is DictType:
p=p.get(name, None)
else:
p=p(name, value)
if not p:
raise Unauthorized(name, value)
if roles is _noroles: return 1
# We are going to need a security-aware object to pass
# to allowed(). We'll use the container.
value=container
# Short-circuit tests if we can:
try:
if roles is None or 'Anonymous' in roles: return 1
except TypeError:
# 'roles' isn't a sequence
LOG('Zope Security Policy', PROBLEM, "'%s' passed as roles"
" during validation of '%s' is not a sequence." % (
`roles`, name))
raise
# Check executable security
stack=context.stack
if stack:
eo=stack[-1]
# If the executable had an owner, can it execute?
if self._ownerous:
owner=eo.getOwner()
if (owner is not None) and not owner.allowed(value, roles):
# We don't want someone to acquire if they can't
# get an unacquired!
raise Unauthorized(name, value)
# Proxy roles, which are a lot safer now.
proxy_roles=getattr(eo, '_proxy_roles', None)
if proxy_roles:
for r in proxy_roles:
if r in roles: return 1
# Proxy roles actually limit access!
raise Unauthorized(name, value)
try:
if self._authenticated and context.user.allowed(value, roles):
return 1
except AttributeError: pass
raise Unauthorized(name, value)
rolesForPermissionOn = getattr(roles, 'rolesForPermissionOn', None)
if rolesForPermissionOn is not None:
roles = rolesForPermissionOn(value)
def checkPermission(self, permission, object, context):
# XXX proxy roles and executable owner are not checked
roles=rolesForPermissionOn(permission, object)
if type(roles) in (StringType, UnicodeType):
roles=[roles]
return context.user.allowed(object, roles)
return roles
......@@ -13,6 +13,9 @@
from unauthorized import Unauthorized
# This has to happen early so things get initialized properly
from Implementation import setImplementation
from SecurityManagement import getSecurityManager, setSecurityPolicy
from SecurityInfo import ClassSecurityInfo, ModuleSecurityInfo
from SecurityInfo import ACCESS_PRIVATE
......@@ -20,7 +23,7 @@ from SecurityInfo import ACCESS_PUBLIC
from SecurityInfo import ACCESS_NONE
from SecurityInfo import secureModule, allow_module, allow_class
from SimpleObjectPolicies import allow_type
from ZopeGuards import full_read_guard, full_write_guard, safe_builtins
from ZopeGuards import full_write_guard, safe_builtins
ModuleSecurityInfo('AccessControl').declarePublic('getSecurityManager')
......
# The code in this file is executed after being compiled as restricted code,
# and given a globals() dict with our idea of safe builtins, and the
# Zope production implementations of the special restricted-Python functions
# (like _getitem_ and _getiter_, etc).
#
# This isn't trying to provoke security problems, it's just trying to verify
# that Python code continues to work as intended after all the transformations,
# and with all the special wrappers we supply.
def f1():
next = iter(xrange(3)).next
assert next() == 0
assert next() == 1
assert next() == 2
try:
next()
except StopIteration:
pass
else:
assert 0, "expected StopIteration"
f1()
def f2():
assert map(lambda x: x+1, range(3)) == range(1, 4)
f2()
def f3():
assert filter(None, range(10)) == range(1, 10)
f3()
def f4():
assert [i+1 for i in range(3)] == range(*(1, 4))
f4()
def f5():
x = range(5)
def add(a, b):
return a+b
assert sum(x) == reduce(add, x, 0)
f5()
def f6():
class C:
def display(self):
return str(self.value)
c1 = C()
c2 = C()
# XXX Oops -- it's apparently against the rules to create a new
# XXX attribute. Trying to yields
# XXX TypeError: attribute-less object (assign or del)
## c1.value = 12
## assert getattr(c1, 'value') == 12
## assert c1.display() == '12'
assert not hasattr(c2, 'value')
## setattr(c2, 'value', 34)
## assert c2.value == 34
## assert hasattr(c2, 'value')
## del c2.value
assert not hasattr(c2, 'value')
# OK, if we can't set new attributes, at least verify that we can't.
try:
c1.value = 12
except TypeError:
pass
else:
assert 0, "expected direct attribute creation to fail"
try:
setattr(c1, 'value', 12)
except TypeError:
pass
else:
assert 0, "expected indirect attribute creation to fail"
assert getattr(C, "display", None) == getattr(C, "display")
try:
setattr(C, "display", lambda self: "replaced")
except TypeError:
pass
else:
assert 0, "expected setattr() attribute replacement to fail"
try:
delattr(C, "display")
except TypeError:
pass
else:
assert 0, "expected delattr() attribute deletion to fail"
f6()
def f7():
d = apply(dict, [((1, 2), (3, 4))]) # {1: 2, 3: 4}
expected = {'k': [1, 3],
'v': [2, 4],
'i': [(1, 2), (3, 4)]}
for meth, kind in [('iterkeys', 'k'),
('iteritems', 'i'),
('itervalues', 'v'),
('keys', 'k'),
('items', 'i'),
('values', 'v')]:
access = getattr(d, meth)
result = list(access())
result.sort()
assert result == expected[kind], (meth, kind, result, expected[kind])
f7()
def f8():
import math
ceil = getattr(math, 'ceil')
smallest = 1e100
smallest_index = None
largest = -1e100
largest_index = None
all = []
for i, x in enumerate((2.2, 1.1, 3.3, 5.5, 4.4)):
all.append(x)
effective = ceil(x)
if effective < smallest:
assert min(effective, smallest) == effective
smallest = effective
smallest_index = i
if effective > largest:
assert max(effective, largest) == effective
largest = effective
largest_index = i
assert smallest == 2
assert smallest_index == 1
assert largest == 6
assert largest_index == 3
assert min([ceil(x) for x in all]) == smallest
assert max(map(ceil, all)) == largest
f8()
# After all the above, these wrappers were still untouched:
# ['DateTime', '_print_', 'reorder', 'same_type', 'test']
# So do something to touch them.
def f9():
d = DateTime()
print d # this one provoked _print_
# Funky. This probably isn't an intended use of reorder, but I'm
# not sure why it exists.
assert reorder('edcbaxyz', 'abcdef', 'c') == zip('abde', 'abde')
assert test(0, 'a', 0, 'b', 1, 'c', 0, 'd') == 'c'
assert test(0, 'a', 0, 'b', 0, 'c', 0, 'd', 'e') == 'e'
# Unclear that the next one is *intended* to return None (it falls off
# the end of test's implementation without explicitly returning anything).
assert test(0, 'a', 0, 'b', 0, 'c', 0, 'd') == None
assert same_type(3, 2, 1), 'expected same type'
assert not same_type(3, 2, 'a'), 'expected not same type'
f9()
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test Bindings
$Id: testBindings.py,v 1.2 2004/01/15 23:09:06 tseaver Exp $
"""
import unittest
import Zope
import AccessControl.SecurityManagement
from AccessControl import Unauthorized
from Testing.makerequest import makerequest
from Products.PythonScripts.PythonScript import PythonScript
class TransactionalTest( unittest.TestCase ):
def setUp( self ):
if hasattr(Zope, 'startup'):
Zope.startup()
get_transaction().begin()
self.connection = Zope.DB.open()
self.root = self.connection.root()[ 'Application' ]
def tearDown( self ):
get_transaction().abort()
self.connection.close()
class RequestTest( TransactionalTest ):
def setUp(self):
TransactionalTest.setUp(self)
root = self.root = makerequest(self.root)
self.REQUEST = root.REQUEST
self.RESPONSE = root.REQUEST.RESPONSE
class SecurityManager:
def __init__(self, reject=0):
self.calls = []
self.reject = reject
def validate(self, *args):
self.calls.append(('validate', args))
if self.reject:
raise Unauthorized
return 1
def validateValue(self, *args):
self.calls.append(('validateValue', args))
if self.reject:
raise Unauthorized
return 1
def checkPermission(self, *args):
self.calls.append(('checkPermission', args))
return not self.reject
def addContext(self, *args):
self.calls.append(('addContext', args))
return 1
def removeContext(self, *args):
self.calls.append(('removeContext', args))
return 1
class GuardTestCase(RequestTest):
def setSecurityManager(self, manager):
key = AccessControl.SecurityManagement.get_ident()
old = AccessControl.SecurityManagement._managers.get(key)
if manager is None:
del AccessControl.SecurityManagement._managers[key]
else:
AccessControl.SecurityManagement._managers[key] = manager
return old
class TestBindings(GuardTestCase):
def setUp(self):
RequestTest.setUp(self)
self.sm = SecurityManager(reject=1)
self.old = self.setSecurityManager(self.sm)
def tearDown(self):
self.setSecurityManager(self.old)
TransactionalTest.tearDown(self)
def _newPS(self, txt, bind=None):
ps = PythonScript('ps')
#ps.ZBindings_edit(bind or {})
ps.write(txt)
ps._makeFunction()
return ps
def test_fail_container(self):
container_ps = self._newPS('return container')
self.root._setOb('container_ps', container_ps)
container_ps = self.root._getOb('container_ps')
self.assertRaises(Unauthorized, container_ps)
def test_fail_context(self):
context_ps = self._newPS('return context')
self.root._setOb('context_ps', context_ps)
context_ps = self.root._getOb('context_ps')
self.assertRaises(Unauthorized, context_ps)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestBindings))
return suite
if __name__ == '__main__':
unittest.main()
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test of the implementation selection support."""
import unittest
from AccessControl.Implementation import getImplementationName
from AccessControl.Implementation import setImplementation
class AccessControlImplementationTest(unittest.TestCase):
have_cAccessControl = None
def setUp(self):
if self.have_cAccessControl is None:
try:
import AccessControl.cAccessControl
except ImportError:
v = False
else:
v = True
self.__class__.have_cAccessControl = v
self.original = getImplementationName()
def tearDown(self):
setImplementation(self.original)
def test_setImplemenationC(self):
# XXX: 'C' ZSP is not yet working
self.assertRaises( NotImplementedError, setImplementation, "C")
return
setImplementation("C")
name = getImplementationName()
if self.have_cAccessControl:
self.assertEqual(name, "C")
else:
self.assertEqual(name, "PYTHON")
def test_setImplemenationPython(self):
setImplementation("Python")
self.assertEqual(getImplementationName(), "PYTHON")
def test_suite():
return unittest.makeSuite(AccessControlImplementationTest)
if __name__ == "__main__":
unittest.main(defaultTest="test_suite")
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test Zope Guards
Well, at least begin testing some of the functionality
$Id: testZopeGuards.py,v 1.2 2004/01/15 23:09:06 tseaver Exp $
"""
import os, sys
import unittest
import ZODB
import AccessControl.SecurityManagement
from AccessControl.SimpleObjectPolicies import ContainerAssertions
from AccessControl import Unauthorized
from AccessControl.ZopeGuards \
import guarded_getattr, get_dict_get, get_dict_pop, get_list_pop, \
get_iter, guarded_min, guarded_max, safe_builtins, guarded_enumerate, \
guarded_sum, guarded_apply
try:
__file__
except NameError:
__file__ = os.path.abspath(sys.argv[1])
_FILEPATH = os.path.abspath( __file__ )
_HERE = os.path.dirname( _FILEPATH )
class SecurityManager:
def __init__(self, reject=0):
self.calls = []
self.reject = reject
def validate(self, *args):
self.calls.append(('validate', args))
if self.reject:
raise Unauthorized
return 1
def validateValue(self, *args):
self.calls.append(('validateValue', args))
if self.reject:
raise Unauthorized
return 1
def checkPermission(self, *args):
self.calls.append(('checkPermission', args))
return not self.reject
class GuardTestCase(unittest.TestCase):
def setSecurityManager(self, manager):
key = AccessControl.SecurityManagement.get_ident()
old = AccessControl.SecurityManagement._managers.get(key)
if manager is None:
del AccessControl.SecurityManagement._managers[key]
else:
AccessControl.SecurityManagement._managers[key] = manager
return old
class Method:
def __init__(self, *args):
self.args = args
class TestGuardedGetattr(GuardTestCase):
def setUp(self):
self.__sm = SecurityManager()
self.__old = self.setSecurityManager(self.__sm)
def tearDown(self):
self.setSecurityManager(self.__old)
def test_calls_validate_for_unknown_type(self):
guarded_getattr(self, 'test_calls_validate_for_unknown_type')
self.assert_(self.__sm.calls)
def test_attr_handler_table(self):
d = {}
_dict = type(d)
old = ContainerAssertions.get(_dict)
mytable = {'keys': 1,
'values': Method,
}
ContainerAssertions[_dict] = mytable
try:
guarded_getattr(d, 'keys')
self.assertEqual(len(self.__sm.calls), 0)
values = guarded_getattr(d, 'values')
self.assertEqual(values.__class__, Method)
self.assertEqual(values.args, (d, 'values'))
self.assertRaises(Unauthorized, guarded_getattr, d, 'items')
finally:
ContainerAssertions[_dict] = old
class TestDictGuards(GuardTestCase):
def test_get_simple(self):
get = get_dict_get({'foo': 'bar'}, 'get')
self.assertEqual(get('foo'), 'bar')
def test_get_default(self):
get = get_dict_get({'foo': 'bar'}, 'get')
self.failUnless(get('baz') is None)
self.assertEqual(get('baz', 'splat'), 'splat')
def test_get_validates(self):
sm = SecurityManager()
old = self.setSecurityManager(sm)
get = get_dict_get({'foo':GuardTestCase}, 'get')
try:
get('foo')
finally:
self.setSecurityManager(old)
self.assert_(sm.calls)
def test_pop_simple(self):
pop = get_dict_pop({'foo': 'bar'}, 'pop')
self.assertEqual(pop('foo'), 'bar')
def test_pop_raises(self):
pop = get_dict_pop({'foo': 'bar'}, 'pop')
self.assertRaises(KeyError, pop, 'baz')
def test_pop_default(self):
pop = get_dict_pop({'foo': 'bar'}, 'pop')
self.assertEqual(pop('baz', 'splat'), 'splat')
def test_pop_validates(self):
sm = SecurityManager()
old = self.setSecurityManager(sm)
pop = get_dict_get({'foo':GuardTestCase}, 'pop')
try:
pop('foo')
finally:
self.setSecurityManager(old)
self.assert_(sm.calls)
if sys.version_info >= (2, 2):
def test_iterkeys_simple(self):
d = {'foo':1, 'bar':2, 'baz':3}
iterkeys = get_iter(d, 'iterkeys')
keys = d.keys()
keys.sort()
ikeys = list(iterkeys())
ikeys.sort()
self.assertEqual(keys, ikeys)
def test_iterkeys_empty(self):
iterkeys = get_iter({}, 'iterkeys')
self.assertEqual(list(iterkeys()), [])
def test_iterkeys_validates(self):
sm = SecurityManager()
old = self.setSecurityManager(sm)
iterkeys = get_iter({GuardTestCase: 1}, 'iterkeys')
try:
iterkeys().next()
finally:
self.setSecurityManager(old)
self.assert_(sm.calls)
def test_itervalues_simple(self):
d = {'foo':1, 'bar':2, 'baz':3}
itervalues = get_iter(d, 'itervalues')
values = d.values()
values.sort()
ivalues = list(itervalues())
ivalues.sort()
self.assertEqual(values, ivalues)
def test_itervalues_empty(self):
itervalues = get_iter({}, 'itervalues')
self.assertEqual(list(itervalues()), [])
def test_itervalues_validates(self):
sm = SecurityManager()
old = self.setSecurityManager(sm)
itervalues = get_iter({GuardTestCase: 1}, 'itervalues')
try:
itervalues().next()
finally:
self.setSecurityManager(old)
self.assert_(sm.calls)
class TestListGuards(GuardTestCase):
def test_pop_simple(self):
pop = get_list_pop(['foo', 'bar', 'baz'], 'pop')
self.assertEqual(pop(), 'baz')
self.assertEqual(pop(0), 'foo')
def test_pop_raises(self):
pop = get_list_pop([], 'pop')
self.assertRaises(IndexError, pop)
def test_pop_validates(self):
sm = SecurityManager()
old = self.setSecurityManager(sm)
pop = get_list_pop([GuardTestCase], 'pop')
try:
pop()
finally:
self.setSecurityManager(old)
self.assert_(sm.calls)
class TestBuiltinFunctionGuards(GuardTestCase):
def test_min_fails(self):
sm = SecurityManager(1) # rejects
old = self.setSecurityManager(sm)
self.assertRaises(Unauthorized, guarded_min, [1,2,3])
self.assertRaises(Unauthorized, guarded_min, 1,2,3)
self.setSecurityManager(old)
def test_max_fails(self):
sm = SecurityManager(1) # rejects
old = self.setSecurityManager(sm)
self.assertRaises(Unauthorized, guarded_max, [1,2,3])
self.assertRaises(Unauthorized, guarded_max, 1,2,3)
self.setSecurityManager(old)
def test_enumerate_fails(self):
sm = SecurityManager(1) # rejects
old = self.setSecurityManager(sm)
enum = guarded_enumerate([1,2,3])
self.assertRaises(Unauthorized, enum.next)
self.setSecurityManager(old)
def test_sum_fails(self):
sm = SecurityManager(1) # rejects
old = self.setSecurityManager(sm)
self.assertRaises(Unauthorized, guarded_sum, [1,2,3])
self.setSecurityManager(old)
def test_min_succeeds(self):
sm = SecurityManager() # accepts
old = self.setSecurityManager(sm)
self.assertEqual(guarded_min([1,2,3]), 1)
self.assertEqual(guarded_min(1,2,3), 1)
self.setSecurityManager(old)
def test_max_succeeds(self):
sm = SecurityManager() # accepts
old = self.setSecurityManager(sm)
self.assertEqual(guarded_max([1,2,3]), 3)
self.assertEqual(guarded_max(1,2,3), 3)
self.setSecurityManager(old)
def test_enumerate_succeeds(self):
sm = SecurityManager() # accepts
old = self.setSecurityManager(sm)
enum = guarded_enumerate([1,2,3])
self.assertEqual(enum.next(), (0,1))
self.assertEqual(enum.next(), (1,2))
self.assertEqual(enum.next(), (2,3))
self.assertRaises(StopIteration, enum.next)
self.setSecurityManager(old)
def test_sum_succeeds(self):
sm = SecurityManager() # accepts
old = self.setSecurityManager(sm)
self.assertEqual(guarded_sum([1,2,3]), 6)
self.assertEqual(guarded_sum([1,2,3], start=36), 42)
self.setSecurityManager(old)
def test_apply(self):
sm = SecurityManager(1) # rejects
old = self.setSecurityManager(sm)
gapply = safe_builtins['apply']
def f(a=1, b=2):
return a+b
# This one actually succeeds, because apply isn't given anything
# to unpack.
self.assertEqual(gapply(f), 3)
# Likewise, because the things passed are empty.
self.assertEqual(gapply(f, (), {}), 3)
self.assertRaises(Unauthorized, gapply, f, [1])
self.assertRaises(Unauthorized, gapply, f, (), {'a': 2})
self.assertRaises(Unauthorized, gapply, f, [1], {'a': 2})
sm = SecurityManager(0) # accepts
self.setSecurityManager(sm)
self.assertEqual(gapply(f), 3)
self.assertEqual(gapply(f, (), {}), 3)
self.assertEqual(gapply(f, [0]), 2)
self.assertEqual(gapply(f, [], {'b': 18}), 19)
self.assertEqual(gapply(f, [10], {'b': 1}), 11)
self.setSecurityManager(old)
class TestGuardedDictListTypes(unittest.TestCase):
def testDictCreation(self):
d = safe_builtins['dict']
self.assertEquals(d(), {})
self.assertEquals(d({1:2}), {1:2})
self.assertEquals(d(((1,2),)), {1:2})
self.assertEquals(d(foo=1), {"foo":1})
self.assertEquals(d.fromkeys((1,2,3)), {1:None, 2:None, 3:None})
self.assertEquals(d.fromkeys((1,2,3), 'f'), {1:'f', 2:'f', 3:'f'})
def testListCreation(self):
l = safe_builtins['list']
self.assertEquals(l(), [])
self.assertEquals(l([1,2,3]), [1,2,3])
x = [3,2,1]
self.assertEquals(l(x), [3,2,1])
if sys.version_info >= (2, 4):
self.assertEquals(l.sorted(x), [1,2,3])
class TestRestrictedPythonApply(GuardTestCase):
def test_apply(self):
sm = SecurityManager(1) # rejects
old = self.setSecurityManager(sm)
gapply = guarded_apply
def f(a=1, b=2):
return a+b
# This one actually succeeds, because apply isn't given anything
# to unpack.
self.assertEqual(gapply(*(f,)), 3)
# Likewise, because the things passed are empty.
self.assertEqual(gapply(*(f,), **{}), 3)
self.assertRaises(Unauthorized, gapply, *(f, 1))
self.assertRaises(Unauthorized, gapply, *(f,), **{'a': 2})
self.assertRaises(Unauthorized, gapply, *(f, 1), **{'a': 2})
sm = SecurityManager(0) # accepts
self.setSecurityManager(sm)
self.assertEqual(gapply(*(f,)), 3)
self.assertEqual(gapply(*(f,), **{}), 3)
self.assertEqual(gapply(*(f, 0)), 2)
self.assertEqual(gapply(*(f,), **{'b': 18}), 19)
self.assertEqual(gapply(*(f, 10), **{'b': 1}), 11)
self.setSecurityManager(old)
# Map function name to the # of times it's been called.
wrapper_count = {}
class FuncWrapper:
def __init__(self, funcname, func):
self.funcname = funcname
wrapper_count[funcname] = 0
self.func = func
def __call__(self, *args, **kws):
wrapper_count[self.funcname] += 1
return self.func(*args, **kws)
def __repr__(self):
return "<FuncWrapper around %r>" % self.func
# Given the high wall between AccessControl and RestrictedPython, I suppose
# the next one could be called an integration test. But we're simply
# trying to run restricted Python with the *intended* implementations of
# the special wrappers here, so no apologies.
class TestActualPython(GuardTestCase):
def testPython(self):
from RestrictedPython.tests import verify
code, its_globals = self._compile("actual_python.py")
verify.verify(code)
# Fiddle the global and safe-builtins dicts to count how many times
# the special functions are called.
self._wrap_replaced_dict_callables(its_globals)
self._wrap_replaced_dict_callables(its_globals['__builtins__'])
sm = SecurityManager()
old = self.setSecurityManager(sm)
try:
exec code in its_globals
finally:
self.setSecurityManager(old)
# Use wrapper_count to determine coverage.
## print wrapper_count # uncomment to see wrapper names & counts
untouched = [k for k, v in wrapper_count.items() if v == 0]
if untouched:
untouched.sort()
self.fail("Unexercised wrappers: %r" % untouched)
# Compile code in fname, as restricted Python. Return the
# compiled code, and a safe globals dict for running it in.
# fname is the string name of a Python file; it must be found
# in the same directory as this file.
def _compile(self, fname):
from RestrictedPython import compile_restricted
from AccessControl.ZopeGuards import get_safe_globals, guarded_getattr
fn = os.path.join( _HERE, fname)
code = compile_restricted(open(fn).read(), fn, 'exec')
g = get_safe_globals()
g['_getattr_'] = guarded_getattr
g['__debug__'] = 1 # so assert statements are active
g['__name__'] = __name__ # so classes can be defined in the script
return code, g
# d is a dict, the globals for execution or our safe builtins.
# The callable values which aren't the same as the corresponding
# entries in __builtin__ are wrapped in a FuncWrapper, so we can
# tell whether they're executed.
def _wrap_replaced_dict_callables(self, d):
import __builtin__
for k, v in d.items():
if callable(v) and v is not getattr(__builtin__, k, None):
d[k] = FuncWrapper(k, v)
def test_suite():
suite = unittest.TestSuite()
for cls in (TestGuardedGetattr,
TestDictGuards,
TestBuiltinFunctionGuards,
TestListGuards,
TestGuardedDictListTypes,
TestRestrictedPythonApply,
TestActualPython,
):
suite.addTest(unittest.makeSuite(cls))
return suite
if __name__ == '__main__':
unittest.main()
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests for the guarded iterartor.
$Id: test_safeiter.py,v 1.2 2004/01/15 23:09:06 tseaver Exp $
"""
import unittest
# Persistence system must be initialized.
import ZODB
from AccessControl import ZopeGuards
class SafeIterTestCase(unittest.TestCase):
# XXX these tests replace the global guard() function in
# AccessControl.ZopeGuards; this is not the nicest way to check
# that things work, but avoids making the SafeIter unit tests from
# testing things other than the guarded iterator itself. In
# particular, it avoids testing the actual guard checks, which
# should be tested separately.
def setUp(self):
self.original_guard = ZopeGuards.guard
ZopeGuards.guard = self.guard
self.checks = []
def tearDown(self):
ZopeGuards.guard = self.original_guard
def guard(self, container, value, index=None):
self.checks.append((id(container), value))
def test_iteration(self):
seq = [1, 2, 3]
seqid = id(seq)
it = ZopeGuards.SafeIter(seq)
self.assertEqual(list(it), seq)
self.assertEqual(self.checks, [(seqid, 1),
(seqid, 2),
(seqid, 3)])
def test_iteration_with_container(self):
seq = [1, 2, 3]
container = object()
contid = id(container)
it = ZopeGuards.SafeIter(seq, container)
self.assertEqual(list(it), seq)
self.assertEqual(self.checks, [(contid, 1),
(contid, 2),
(contid, 3)])
def test_suite():
return unittest.makeSuite(SafeIterTestCase)
......@@ -17,7 +17,7 @@ Page Template-specific implementation of TALES, with handlers
for Python expressions, string literals, and paths.
"""
__version__='$Revision: 1.44 $'[11:-2]
__version__='$Revision: 1.45 $'[11:-2]
import re, sys
from TALES import Engine, CompilerError, _valid_name, NAME_RE, \
......@@ -54,12 +54,7 @@ if sys.modules.has_key('Zope'):
from AccessControl import Unauthorized
except ImportError:
Unauthorized = "Unauthorized"
if hasattr(AccessControl, 'full_read_guard'):
from ZRPythonExpr import PythonExpr, _SecureModuleImporter, \
call_with_ns
else:
from ZPythonExpr import PythonExpr, _SecureModuleImporter, \
call_with_ns
from ZRPythonExpr import PythonExpr, _SecureModuleImporter, call_with_ns
else:
from PythonExpr import getSecurityManager, PythonExpr
guarded_getattr = getattr
......@@ -312,7 +307,7 @@ def restrictedTraverse(object, path, securityManager,
# Skip directly to item access
o = object[name]
# Check access to the item.
if not validate(object, object, name, o):
if not validate(object, object, None, o):
raise Unauthorized, name
object = o
continue
......@@ -367,7 +362,7 @@ def restrictedTraverse(object, path, securityManager,
raise
else:
# Check access to the item.
if not validate(object, object, name, o):
if not validate(object, object, None, o):
raise Unauthorized, name
object = o
......
......@@ -16,21 +16,20 @@
Handler for Python expressions that uses the RestrictedPython package.
"""
__version__='$Revision: 1.10 $'[11:-2]
__version__='$Revision: 1.11 $'[11:-2]
from AccessControl import full_read_guard, full_write_guard, \
safe_builtins, getSecurityManager
from AccessControl.ZopeGuards import guarded_getattr, guarded_getitem
from AccessControl import safe_builtins
from AccessControl.ZopeGuards import guarded_getattr, get_safe_globals
from RestrictedPython import compile_restricted_eval
from TALES import CompilerError
from PythonExpr import PythonExpr
class PythonExpr(PythonExpr):
_globals = {'__debug__': __debug__,
'__builtins__': safe_builtins,
'_getattr_': guarded_getattr,
'_getitem_': guarded_getitem,}
_globals = get_safe_globals()
_globals['_getattr_'] = guarded_getattr
_globals['__debug__' ] = __debug__
def __init__(self, name, expr, engine):
self.expr = expr = expr.strip().replace('\n', ' ')
code, err, warn, use = compile_restricted_eval(expr, str(self))
......
......@@ -17,7 +17,7 @@ This product provides support for Script objects containing restricted
Python code.
"""
__version__='$Revision: 1.52 $'[11:-2]
__version__='$Revision: 1.53 $'[11:-2]
import sys, os, traceback, re, marshal, new
from Globals import DTMLFile, MessageDialog, package_home
......@@ -31,10 +31,10 @@ from Shared.DC.Scripts.Script import Script, BindingsUI, defaultBindings
from AccessControl import getSecurityManager
from OFS.History import Historical, html_diff
from OFS.Cache import Cacheable
from AccessControl import full_write_guard, safe_builtins
from AccessControl.ZopeGuards import guarded_getattr, guarded_getitem
from AccessControl.ZopeGuards import get_safe_globals, guarded_getattr
from zLOG import LOG, ERROR, INFO, PROBLEM
from zExceptions import Forbidden
import Globals
# Track the Python bytecode version
import imp
......@@ -223,6 +223,7 @@ class PythonScript(Script, Historical, Cacheable):
def _compiler(self, *args, **kw):
return RestrictedPython.compile_restricted_function(*args, **kw)
def _compile(self):
bind_names = self.getBindingAssignments().getAssignedNamesInOrder()
r = self._compiler(self._params, self._body or 'pass',
......@@ -255,14 +256,11 @@ class PythonScript(Script, Historical, Cacheable):
self._v_change = 0
def _newfun(self, code):
g = {'__debug__': __debug__,
'__name__': None,
'__builtins__': safe_builtins,
'_getattr_': guarded_getattr,
'_getitem_': guarded_getitem,
'_write_': full_write_guard,
'_print_': RestrictedPython.PrintCollector,
}
g = get_safe_globals()
g['_getattr_'] = guarded_getattr
g['__debug__'] = __debug__
g['__name__'] = None
l = {}
exec code in g, l
self._v_f = f = l.values()[0]
......@@ -490,6 +488,8 @@ class PythonScript(Script, Historical, Cacheable):
return self.read()
Globals.InitializeClass(PythonScript)
class PythonScriptTracebackSupplement:
"""Implementation of ITracebackSupplement"""
def __init__(self, script, line=0):
......
......@@ -10,12 +10,12 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os, sys, unittest
import os, unittest
import ZODB
from Products.PythonScripts.PythonScript import PythonScript
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from RestrictedPython.tests.verify import verify
if __name__=='__main__':
......@@ -28,13 +28,17 @@ else:
# Test Classes
def readf(name):
return open( os.path.join( here
, 'tscripts'
, '%s.ps' % name
), 'r').read()
path = os.path.join(here, 'tscripts', '%s.ps' % name)
return open(path, 'r').read()
class TestPythonScriptNoAq(unittest.TestCase):
class VerifiedPythonScript(PythonScript):
def _newfun(self, code):
verify(code)
return PythonScript._newfun(self, code)
class PythonScriptTestBase(unittest.TestCase):
def setUp(self):
newSecurityManager(None, None)
......@@ -42,10 +46,12 @@ class TestPythonScriptNoAq(unittest.TestCase):
noSecurityManager()
def _newPS(self, txt, bind=None):
ps = PythonScript('ps')
ps = VerifiedPythonScript('ps')
ps.ZBindings_edit(bind or {})
ps.write(txt)
ps._makeFunction()
if ps.errors:
raise SyntaxError, ps.errors[0]
return ps
def _filePS(self, fname, bind=None):
......@@ -55,113 +61,181 @@ class TestPythonScriptNoAq(unittest.TestCase):
ps._makeFunction()
return ps
class TestPythonScriptNoAq(PythonScriptTestBase):
def fail(self):
'Fail if called'
assert 0, 'Fail called'
def testEmpty(self):
empty = self._newPS('')()
assert empty is None, empty
self.failUnless(empty is None)
def testIndented(self):
# This failed to compile in Zope 2.4.0b2.
res = self._newPS('if 1:\n return 2')()
assert res == 2, res
self.assertEqual(res, 2)
def testReturn(self):
return1 = self._newPS('return 1')()
assert return1 == 1, return1
res = self._newPS('return 1')()
self.assertEqual(res, 1)
def testReturnNone(self):
none = self._newPS('return')()
assert none == None
res = self._newPS('return')()
self.failUnless(res is None)
def testParam1(self):
txt = self._newPS('##parameters=x\nreturn x')('txt')
assert txt == 'txt', txt
res = self._newPS('##parameters=x\nreturn x')('txt')
self.assertEqual(res, 'txt')
def testParam2(self):
eq = self.assertEqual
one, two = self._newPS('##parameters=x,y\nreturn x,y')('one','two')
assert one == 'one'
assert two == 'two'
eq(one, 'one')
eq(two, 'two')
def testParam26(self):
import string
params = string.letters[:26]
sparams = ','.join(params)
ps = self._newPS('##parameters=%s\nreturn %s' % (sparams, sparams))
tup = ps(*params)
assert tup == tuple(params), (tup, params)
res = ps(*params)
self.assertEqual(res, tuple(params))
def testArithmetic(self):
one = self._newPS('return 1 * 5 + 4 / 2 - 6')()
assert one == 1, one
res = self._newPS('return 1 * 5 + 4 / 2 - 6')()
self.assertEqual(res, 1)
def testReduce(self):
res = self._newPS('return reduce(lambda x, y: x + y, [1,3,5,7])')()
self.assertEqual(res, 16)
res = self._newPS('return reduce(lambda x, y: x + y, [1,3,5,7], 1)')()
self.assertEqual(res, 17)
def testImport(self):
a,b,c = self._newPS('import string; return string.split("a b c")')()
assert a == 'a'
assert b == 'b'
assert c == 'c'
eq = self.assertEqual
a, b, c = self._newPS('import string; return string.split("a b c")')()
eq(a, 'a')
eq(b, 'b')
eq(c, 'c')
def testWhileLoop(self):
one = self._filePS('while_loop')()
assert one == 1
res = self._filePS('while_loop')()
self.assertEqual(res, 1)
def testForLoop(self):
ten = self._filePS('for_loop')()
assert ten == 10
res = self._filePS('for_loop')()
self.assertEqual(res, 10)
def testMutateLiterals(self):
eq = self.assertEqual
l, d = self._filePS('mutate_literals')()
assert l == [2], l
assert d == {'b': 2}
eq(l, [2])
eq(d, {'b': 2})
def testTupleUnpackAssignment(self):
eq = self.assertEqual
d, x = self._filePS('tuple_unpack_assignment')()
assert d == {'a': 0, 'b': 1, 'c': 2}, d
assert x == 3, x
eq(d, {'a': 0, 'b': 1, 'c': 2})
eq(x, 3)
def testDoubleNegation(self):
one = self._newPS('return not not "this"')()
assert one == 1
res = self._newPS('return not not "this"')()
self.assertEqual(res, 1)
def testTryExcept(self):
a,b = self._filePS('try_except')()
assert a==1
assert b==1
eq = self.assertEqual
a, b = self._filePS('try_except')()
eq(a, 1)
eq(b, 1)
def testBigBoolean(self):
true = self._filePS('big_boolean')()
assert true, true
res = self._filePS('big_boolean')()
self.failUnless(res)
def testFibonacci(self):
r = self._filePS('fibonacci')()
assert r == [1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377,
610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657,
46368, 75025, 121393, 196418, 317811, 514229, 832040,
1346269, 2178309, 3524578, 5702887, 9227465, 14930352,
24157817, 39088169, 63245986], r
res = self._filePS('fibonacci')()
self.assertEqual(
res, [1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377,
610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657,
46368, 75025, 121393, 196418, 317811, 514229, 832040,
1346269, 2178309, 3524578, 5702887, 9227465, 14930352,
24157817, 39088169, 63245986])
def testSimplePrint(self):
txt = self._filePS('simple_print')()
assert txt == 'a 1 []\n', txt
res = self._filePS('simple_print')()
self.assertEqual(res, 'a 1 []\n')
def testComplexPrint(self):
txt = self._filePS('complex_print')()
assert txt == 'double\ndouble\nx: 1\ny: 0 1 2\n\n', txt
res = self._filePS('complex_print')()
self.assertEqual(res, 'double\ndouble\nx: 1\ny: 0 1 2\n\n')
def testNSBind(self):
f = self._filePS('ns_bind', bind={'name_ns': '_'})
bound = f.__render_with_namespace__({'yes': 1, 'no': self.fail})
assert bound == 1, bound
self.assertEqual(bound, 1)
def testBooleanMap(self):
true = self._filePS('boolean_map')()
assert true
res = self._filePS('boolean_map')()
self.failUnless(res)
def testGetSize(self):
f = self._filePS('complex_print')
self.assertEqual(f.get_size(),len(f.read()))
self.assertEqual(f.get_size(), len(f.read()))
class TestPythonScriptErrors(PythonScriptTestBase):
def assertPSRaises(self, error, path=None, body=None):
assert not (path and body) and (path or body)
if body is None:
body = readf(path)
if error is SyntaxError:
self.assertRaises(SyntaxError, self._newPS, body)
else:
ps = self._newPS(body)
self.assertRaises(error, ps)
def testSubversiveExcept(self):
self.assertPSRaises(SyntaxError, path='subversive_except')
def testBadImports(self):
self.assertPSRaises(ImportError, body="from string import *")
self.assertPSRaises(ImportError, body="import mmap")
def testAttributeAssignment(self):
# It's illegal to assign to attributes of anything except
# list or dict.
cases = [("import string", "string"),
("class Spam: pass", "Spam"),
("def f(): pass", "f"),
("class Spam: pass\nspam = Spam()", "spam"),
]
assigns = ["%s.splat = 'spam'",
"setattr(%s, '_getattr_', lambda x, y: True)",
"del %s.splat",
]
for defn, name in cases:
for asn in assigns:
f = self._newPS(defn + "\n" + asn % name)
self.assertRaises(TypeError, f)
class TestPythonScriptGlobals(PythonScriptTestBase):
def _exec(self, script, bound_names=None, args=None, kws=None):
if args is None:
args = ()
if kws is None:
kws = {}
bindings = {'name_container': 'container'}
f = self._filePS(script, bindings)
return f._exec(bound_names, args, kws)
def testGlobalIsDeclaration(self):
bindings = {'container': 7}
results = self._exec('global_is_declaration', bindings)
self.assertEqual(results, 8)
def test__name__(self):
fname = 'class.__name__'
......@@ -170,11 +244,15 @@ class TestPythonScriptNoAq(unittest.TestCase):
def test_suite():
suite = unittest.TestSuite()
suite.addTest( unittest.makeSuite( TestPythonScriptNoAq ) )
suite.addTest(unittest.makeSuite(TestPythonScriptNoAq))
suite.addTest(unittest.makeSuite(TestPythonScriptErrors))
suite.addTest(unittest.makeSuite(TestPythonScriptGlobals))
return suite
def main():
unittest.TextTestRunner().run(test_suite())
if __name__ == '__main__':
main()
# -*- python -*-
# An attempt to bind an illegal name in an except clause
try:
1/0
except ZeroDivisionError, __getattr__:
pass
......@@ -11,13 +11,16 @@
#
##############################################################################
"""Restricted Python Expressions
$Id: Eval.py,v 1.6 2004/01/15 23:09:09 tseaver Exp $
"""
__rcs_id__='$Id: Eval.py,v 1.5 2002/08/14 21:44:31 mj Exp $'
__version__='$Revision: 1.5 $'[11:-2]
__version__='$Revision: 1.6 $'[11:-2]
from RestrictedPython import compile_restricted_eval
from string import translate, strip
import string
compile_restricted_eval = None
nltosp = string.maketrans('\r\n',' ')
......@@ -30,8 +33,8 @@ def default_guarded_getitem(ob, index):
PROFILE = 0
class RestrictionCapableEval:
"""A base class for restricted code.
"""
"""A base class for restricted code."""
globals = {'__builtins__': None}
rcode = None # restricted
ucode = None # unrestricted
......@@ -52,12 +55,6 @@ class RestrictionCapableEval:
def prepRestrictedCode(self):
if self.rcode is None:
global compile_restricted_eval
if compile_restricted_eval is None:
# Late binding because this will import the whole
# compiler suite.
from RestrictedPython import compile_restricted_eval
if PROFILE:
from time import clock
start = clock()
......
......@@ -10,55 +10,80 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from __future__ import nested_scopes
__version__='$Revision: 1.13 $'[11:-2]
__version__ = '$Revision: 1.14 $'[11:-2]
import exceptions
import new
# This tiny set of safe builtins is extended by users of the module.
# AccessControl.ZopeGuards contains a large set of wrappers for builtins.
# DocumentTemplate.DT_UTil contains a few.
safe_builtins = {}
for name in ('None', 'abs', 'chr', 'divmod', 'float', 'hash', 'hex', 'int',
'len', 'max', 'min', 'oct', 'ord', 'round', 'str', 'pow',
'apply', 'callable', 'cmp', 'complex', 'isinstance',
'issubclass', 'long', 'repr', 'range', 'list', 'tuple',
'unichr', 'unicode', 'True', 'False', 'bool',
'dict', 'sum', 'enumerate',):
for name in ['False', 'None', 'True', 'abs', 'basestring', 'bool', 'callable',
'chr', 'cmp', 'complex', 'divmod', 'float', 'hash',
'hex', 'id', 'int', 'isinstance', 'issubclass', 'len',
'long', 'oct', 'ord', 'pow', 'range', 'repr', 'round',
'str', 'tuple', 'unichr', 'unicode', 'xrange', 'zip']:
safe_builtins[name] = __builtins__[name]
for name in dir(exceptions):
if name[0] != "_":
safe_builtins[name] = getattr(exceptions, name)
# Wrappers provided by this module:
# delattr
# setattr
# Wrappers provided by ZopeGuards:
# __import__
# apply
# dict
# enumerate
# filter
# getattr
# hasattr
# iter
# list
# map
# max
# min
# sum
def _full_read_guard(g_attr, g_item):
# Nested scope abuse!
# The arguments are used by class Wrapper
# safetype variable is used by guard()
safetype = {type(()): 1, type([]): 1, type({}): 1, type(''): 1}.has_key
def guard(ob, write=None, safetype=safetype):
# Don't bother wrapping simple types, or objects that claim to
# handle their own read security.
if safetype(type(ob)) or getattr(ob, '_guarded_reads', 0):
return ob
# ob is shared by class Wrapper, so the class instance wraps it.
class Wrapper:
def __len__(self):
# Required for slices with negative bounds
return len(ob)
def __getattr__(self, name):
return g_attr(ob, name)
def __getitem__(self, i):
# Must handle both item and slice access.
return g_item(ob, i)
# Optional, for combined read/write guard
def __setitem__(self, index, val):
write(ob)[index] = val
def __setattr__(self, attr, val):
setattr(write(ob), attr, val)
return Wrapper()
return guard
# Builtins that are intentionally disabled
# compile - don't let them produce new code
# dir - a general purpose introspector, probably hard to wrap
# execfile - no direct I/O
# file - no direct I/O
# globals - uncontrolled namespace access
# input - no direct I/O
# locals - uncontrolled namespace access
# open - no direct I/O
# raw_input - no direct I/O
# vars - uncontrolled namespace access
# There are several strings that describe Python. I think there's no
# point to including these, although they are obviously safe:
# copyright, credits, exit, help, license, quit
# Not provided anywhere. Do something about these? Several are
# related to new-style classes, which we are too scared of to support
# <0.3 wink>. coerce, buffer, and reload are esoteric enough that no
# one should care.
# buffer
# classmethod
# coerce
# eval
# intern
# object
# property
# reload
# slice
# staticmethod
# super
# type
for name in dir(exceptions):
if name[0] != "_":
safe_builtins[name] = getattr(exceptions, name)
def _write_wrapper():
# Construct the write wrapper class
......@@ -90,9 +115,9 @@ def _write_wrapper():
def _full_write_guard():
# Nested scope abuse!
# safetype and Wrapper variables are used by guard()
safetype = {type([]): 1, type({}): 1}.has_key
safetype = {dict: True, list: True}.has_key
Wrapper = _write_wrapper()
def guard(ob, safetype=safetype, Wrapper=Wrapper):
def guard(ob):
# Don't bother wrapping simple types, or objects that claim to
# handle their own write security.
if safetype(type(ob)) or hasattr(ob, '_guarded_writes'):
......
......@@ -11,7 +11,7 @@
#
##############################################################################
__version__='$Revision: 1.4 $'[11:-2]
__version__='$Revision: 1.5 $'[11:-2]
limited_builtins = {}
......@@ -34,13 +34,13 @@ def limited_range(iFirst, *args):
limited_builtins['range'] = limited_range
def limited_list(seq):
if type(seq) is type(''):
if isinstance(seq, str):
raise TypeError, 'cannot convert string to list'
return list(seq)
limited_builtins['list'] = limited_list
def limited_tuple(seq):
if type(seq) is type(''):
if isinstance(seq, str):
raise TypeError, 'cannot convert string to tuple'
return tuple(seq)
limited_builtins['tuple'] = limited_tuple
......@@ -14,13 +14,11 @@
Python standard library.
"""
__version__='$Revision: 1.5 $'[11:-2]
__version__='$Revision: 1.6 $'[11:-2]
from compiler import ast, parse, misc, syntax
from compiler import ast, parse, misc, syntax, pycodegen
from compiler.pycodegen import AbstractCompileMode, Expression, \
Interactive, Module
from traceback import format_exception_only
Interactive, Module, ModuleCodeGenerator, FunctionCodeGenerator, findOp
import MutatingWalker
from RestrictionMutator import RestrictionMutator
......@@ -39,8 +37,9 @@ def niceParse(source, filename, mode):
# Some other error occurred.
raise
class RestrictedCompileMode (AbstractCompileMode):
class RestrictedCompileMode(AbstractCompileMode):
"""Abstract base class for hooking up custom CodeGenerator."""
# See concrete subclasses below.
def __init__(self, source, filename):
self.rm = RestrictionMutator()
......@@ -51,59 +50,17 @@ class RestrictedCompileMode (AbstractCompileMode):
def _get_tree(self):
tree = self.parse()
rm = self.rm
MutatingWalker.walk(tree, rm)
if rm.errors:
raise SyntaxError, rm.errors[0]
MutatingWalker.walk(tree, self.rm)
if self.rm.errors:
raise SyntaxError, self.rm.errors[0]
misc.set_filename(self.filename, tree)
syntax.check(tree)
return tree
class RExpression(RestrictedCompileMode, Expression):
mode = "eval"
compile = Expression.compile
class RInteractive(RestrictedCompileMode, Interactive):
mode = "single"
compile = Interactive.compile
class RModule(RestrictedCompileMode, Module):
mode = "exec"
compile = Module.compile
class RFunction(RModule):
"""A restricted Python function built from parts.
"""
def __init__(self, p, body, name, filename, globalize=None):
self.params = p
self.body = body
self.name = name
self.globalize = globalize
RModule.__init__(self, None, filename)
def parse(self):
# Parse the parameters and body, then combine them.
firstline = 'def f(%s): pass' % self.params
tree = niceParse(firstline, '<function parameters>', 'exec')
f = tree.node.nodes[0]
body_code = niceParse(self.body, self.filename, 'exec')
# Stitch the body code into the function.
f.code.nodes = body_code.node.nodes
f.name = self.name
# Look for a docstring.
stmt1 = f.code.nodes[0]
if (isinstance(stmt1, ast.Discard) and
isinstance(stmt1.expr, ast.Const) and
type(stmt1.expr.value) is type('')):
f.doc = stmt1.expr.value
if self.globalize:
f.code.nodes.insert(0, ast.Global(map(str, self.globalize)))
return tree
def compile(self):
tree = self._get_tree()
gen = self.CodeGeneratorClass(tree)
self.code = gen.getCode()
def compileAndTuplize(gen):
......@@ -119,25 +76,26 @@ def compile_restricted_function(p, body, name, filename, globalize=None):
The function can be reconstituted using the 'new' module:
new.function(<code>, <globals>)
The globalize argument, if specified, is a list of variable names to be
treated as globals (code is generated as if each name in the list
appeared in a global statement at the top of the function).
"""
gen = RFunction(p, body, name, filename, globalize=globalize)
gen = RFunction(p, body, name, filename, globalize)
return compileAndTuplize(gen)
def compile_restricted_exec(s, filename='<string>'):
"""Compiles a restricted code suite.
"""
"""Compiles a restricted code suite."""
gen = RModule(s, filename)
return compileAndTuplize(gen)
def compile_restricted_eval(s, filename='<string>'):
"""Compiles a restricted expression.
"""
"""Compiles a restricted expression."""
gen = RExpression(s, filename)
return compileAndTuplize(gen)
def compile_restricted(source, filename, mode):
"""Replacement for the builtin compile() function.
"""
"""Replacement for the builtin compile() function."""
if mode == "single":
gen = RInteractive(source, filename)
elif mode == "exec":
......@@ -149,3 +107,129 @@ def compile_restricted(source, filename, mode):
"'eval' or 'single'")
gen.compile()
return gen.getCode()
class RestrictedCodeGenerator:
"""Mixin for CodeGenerator to replace UNPACK_SEQUENCE bytecodes.
The UNPACK_SEQUENCE opcode is not safe because it extracts
elements from a sequence without using a safe iterator or
making __getitem__ checks.
This code generator replaces use of UNPACK_SEQUENCE with calls to
a function that unpacks the sequence, performes the appropriate
security checks, and returns a simple list.
"""
# Replace the standard code generator for assignments to tuples
# and lists.
def _gen_safe_unpack_sequence(self, num):
# We're at a place where UNPACK_SEQUENCE should be generated, to
# unpack num items. That's a security hole, since it exposes
# individual items from an arbitrary iterable. We don't remove
# the UNPACK_SEQUENCE, but instead insert a call to our _getiter_()
# wrapper first. That applies security checks to each item as
# it's delivered. codegen is (just) a bit messy because the
# iterable is already on the stack, so we have to do a stack swap
# to get things in the right order.
self.emit('LOAD_GLOBAL', '_getiter_')
self.emit('ROT_TWO')
self.emit('CALL_FUNCTION', 1)
self.emit('UNPACK_SEQUENCE', num)
def _visitAssSequence(self, node):
if findOp(node) != 'OP_DELETE':
self._gen_safe_unpack_sequence(len(node.nodes))
for child in node.nodes:
self.visit(child)
visitAssTuple = _visitAssSequence
visitAssList = _visitAssSequence
# Call to generate code for unpacking nested tuple arguments
# in function calls.
def unpackSequence(self, tup):
self._gen_safe_unpack_sequence(len(tup))
for elt in tup:
if isinstance(elt, tuple):
self.unpackSequence(elt)
else:
self._nameOp('STORE', elt)
# A collection of code generators that adds the restricted mixin to
# handle unpacking for all the different compilation modes. They
# are defined here (at the end) so that can refer to RestrictedCodeGenerator.
class RestrictedFunctionCodeGenerator(RestrictedCodeGenerator,
pycodegen.FunctionCodeGenerator):
pass
class RestrictedExpressionCodeGenerator(RestrictedCodeGenerator,
pycodegen.ExpressionCodeGenerator):
pass
class RestrictedInteractiveCodeGenerator(RestrictedCodeGenerator,
pycodegen.InteractiveCodeGenerator):
pass
class RestrictedModuleCodeGenerator(RestrictedCodeGenerator,
pycodegen.ModuleCodeGenerator):
def initClass(self):
ModuleCodeGenerator.initClass(self)
self.__class__.FunctionGen = RestrictedFunctionCodeGenerator
# These subclasses work around the definition of stub compile and mode
# attributes in the common base class AbstractCompileMode. If it
# didn't define new attributes, then the stub code inherited via
# RestrictedCompileMode would override the real definitions in
# Expression.
class RExpression(RestrictedCompileMode, Expression):
mode = "eval"
CodeGeneratorClass = RestrictedExpressionCodeGenerator
class RInteractive(RestrictedCompileMode, Interactive):
mode = "single"
CodeGeneratorClass = RestrictedInteractiveCodeGenerator
class RModule(RestrictedCompileMode, Module):
mode = "exec"
CodeGeneratorClass = RestrictedModuleCodeGenerator
class RFunction(RModule):
"""A restricted Python function built from parts."""
CodeGeneratorClass = RestrictedModuleCodeGenerator
def __init__(self, p, body, name, filename, globals):
self.params = p
self.body = body
self.name = name
self.globals = globals or []
RModule.__init__(self, None, filename)
def parse(self):
# Parse the parameters and body, then combine them.
firstline = 'def f(%s): pass' % self.params
tree = niceParse(firstline, '<function parameters>', 'exec')
f = tree.node.nodes[0]
body_code = niceParse(self.body, self.filename, 'exec')
# Stitch the body code into the function.
f.code.nodes = body_code.node.nodes
f.name = self.name
# Look for a docstring.
stmt1 = f.code.nodes[0]
if (isinstance(stmt1, ast.Discard) and
isinstance(stmt1.expr, ast.Const) and
isinstance(stmt1.expr.value, str)):
f.doc = stmt1.expr.value
# The caller may specify that certain variables are globals
# so that they can be referenced before a local assignment.
# The only known example is the variables context, container,
# script, traverse_subpath in PythonScripts.
if self.globals:
f.code.nodes.insert(0, ast.Global(self.globals))
return tree
......@@ -10,12 +10,14 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
'''
"""Modify AST to include security checks.
RestrictionMutator modifies a tree produced by
compiler.transformer.Transformer, restricting and enhancing the
code in various ways before sending it to pycodegen.
'''
__version__='$Revision: 1.12 $'[11:-2]
$Revision: 1.13 $
"""
from SelectCompiler import ast, parse, OP_ASSIGN, OP_DELETE, OP_APPLY
......@@ -23,7 +25,7 @@ from SelectCompiler import ast, parse, OP_ASSIGN, OP_DELETE, OP_APPLY
# line number attributes. These trees can then be inserted into other
# trees without affecting line numbers shown in tracebacks, etc.
def rmLineno(node):
'''Strip lineno attributes from a code tree'''
"""Strip lineno attributes from a code tree."""
if node.__dict__.has_key('lineno'):
del node.lineno
for child in node.getChildren():
......@@ -31,66 +33,42 @@ def rmLineno(node):
rmLineno(child)
def stmtNode(txt):
'''Make a "clean" statement node'''
"""Make a "clean" statement node."""
node = parse(txt).node.nodes[0]
rmLineno(node)
return node
def exprNode(txt):
'''Make a "clean" expression node'''
return stmtNode(txt).expr
# There should be up to four objects in the global namespace. If a
# wrapper function or print target is needed in a particular module or
# function, it is obtained from one of these objects. There is a
# local and a global binding for each object: the global name has a
# trailing underscore, while the local name does not.
_print_target_name = ast.Name('_print')
_getattr_name = ast.Name('_getattr')
_getattr_name_expr = ast.Name('_getattr_')
_getitem_name = ast.Name('_getitem')
_getitem_name_expr = ast.Name('_getitem_')
_write_guard_name = ast.Name('_write')
# The security checks are performed by a set of six functions that
# must be provided by the restricted environment.
_apply_name = ast.Name("_apply_")
_getattr_name = ast.Name("_getattr_")
_getitem_name = ast.Name("_getitem_")
_getiter_name = ast.Name("_getiter_")
_print_target_name = ast.Name("_print")
_write_name = ast.Name("_write_")
# Constants.
_None_const = ast.Const(None)
_write_const = ast.Const('write')
_write_const = ast.Const("write")
# Example prep code:
#
# global _getattr_
# _getattr = _getattr_
_prep_code = {}
for _n in ('getattr', 'getitem', 'write', 'print'):
_prep_code[_n] = [ast.Global(['_%s_' % _n]),
stmtNode('_%s = _%s_' % (_n, _n))]
# Call the global _print instead of copying it.
_prep_code['print'][1] = stmtNode('_print = _print_()')
_printed_expr = stmtNode("_print()").expr
_print_target_node = stmtNode("_print = _print_()")
_printed_expr = exprNode('_print()')
# Keep track of which restrictions have been applied in a given scope.
class FuncInfo:
_print_used = 0
_printed_used = 0
_getattr_used = 0
_getitem_used = 0
_write_used = 0
_is_suite = 0 # True for modules and functions, false for expressions
print_used = False
printed_used = False
class RestrictionMutator:
def __init__(self):
self.funcinfo = FuncInfo()
self.warnings = []
self.errors = []
self.used_names = {}
self.funcinfo = FuncInfo()
def error(self, node, info):
"""Records a security error discovered during compilation.
"""
"""Records a security error discovered during compilation."""
lineno = getattr(node, 'lineno', None)
if lineno is not None and lineno > 0:
self.errors.append('Line %d: %s' % (lineno, info))
......@@ -112,11 +90,11 @@ class RestrictionMutator:
and perhaps other statements assign names. Special case:
'_' is allowed.
"""
if len(name) > 1 and name[0] == '_':
if name.startswith("_") and name != "_":
# Note: "_" *is* allowed.
self.error(node, '"%s" is an invalid variable name because'
' it starts with "_"' % name)
if name == 'printed':
if name == "printed":
self.error(node, '"printed" is a reserved name.')
def checkAttrName(self, node):
......@@ -127,38 +105,23 @@ class RestrictionMutator:
security policy. Special case: '_' is allowed.
"""
name = node.attrname
if len(name) > 1 and name[0] == '_':
if name.startswith("_") and name != "_":
# Note: "_" *is* allowed.
self.error(node, '"%s" is an invalid attribute name '
'because it starts with "_".' % name)
def prepBody(self, body):
"""Prepends preparation code to a code suite.
"""Insert code for print at the beginning of the code suite."""
For example, if a code suite uses getattr operations,
this places the following code at the beginning of the suite:
global _getattr_
_getattr = _getattr_
Similarly for _getitem_, _print_, and _write_.
"""
info = self.funcinfo
if info._print_used or info._printed_used:
if self.funcinfo.print_used or self.funcinfo.printed_used:
# Add code at top for creating _print_target
body[0:0] = _prep_code['print']
if not info._printed_used:
body.insert(0, _print_target_node)
if not self.funcinfo.printed_used:
self.warnings.append(
"Prints, but never reads 'printed' variable.")
elif not info._print_used:
elif not self.funcinfo.print_used:
self.warnings.append(
"Doesn't print, but reads 'printed' variable.")
if info._getattr_used:
body[0:0] = _prep_code['getattr']
if info._getitem_used:
body[0:0] = _prep_code['getitem']
if info._write_used:
body[0:0] = _prep_code['write']
def visitFunction(self, node, walker):
"""Checks and mutates a function definition.
......@@ -169,12 +132,15 @@ class RestrictionMutator:
"""
self.checkName(node, node.name)
for argname in node.argnames:
self.checkName(node, argname)
if isinstance(argname, str):
self.checkName(node, argname)
else:
for name in argname:
self.checkName(node, name)
walker.visitSequence(node.defaults)
former_funcinfo = self.funcinfo
self.funcinfo = FuncInfo()
self.funcinfo._is_suite = 1
node = walker.defaultVisitNode(node, exclude=('defaults',))
self.prepBody(node.code.nodes)
self.funcinfo = former_funcinfo
......@@ -206,11 +172,10 @@ class RestrictionMutator:
method that changes them.
"""
node = walker.defaultVisitNode(node)
self.funcinfo._print_used = 1
self.funcinfo.print_used = True
if node.dest is None:
node.dest = _print_target_name
else:
self.funcinfo._getattr_used = 1
# Pre-validate access to the "write" attribute.
# "print >> ob, x" becomes
# "print >> (_getattr(ob, 'write') and ob), x"
......@@ -228,18 +193,58 @@ class RestrictionMutator:
"""
if node.name == 'printed':
# Replace name lookup with an expression.
self.funcinfo._printed_used = 1
self.funcinfo.printed_used = True
return _printed_expr
self.checkName(node, node.name)
self.used_names[node.name] = 1
self.used_names[node.name] = True
return node
def visitAssName(self, node, walker):
"""Checks a name assignment using checkName().
def visitCallFunc(self, node, walker):
"""Checks calls with *-args and **-args.
That's a way of spelling apply(), and needs to use our safe
_apply_ instead.
"""
walked = walker.defaultVisitNode(node)
if node.star_args is None and node.dstar_args is None:
# This is not an extended function call
return walked
# Otherwise transform foo(a, b, c, d=e, f=g, *args, **kws) into a call
# of _apply_(foo, a, b, c, d=e, f=g, *args, **kws). The interesting
# thing here is that _apply_() is defined with just *args and **kws,
# so it gets Python to collapse all the myriad ways to call functions
# into one manageable form.
#
# From there, _apply_() digs out the first argument of *args (it's the
# function to call), wraps args and kws in guarded accessors, then
# calls the function, returning the value.
# Transform foo(...) to _apply(foo, ...)
walked.args.insert(0, walked.node)
walked.node = _apply_name
return walked
def visitAssName(self, node, walker):
"""Checks a name assignment using checkName()."""
self.checkName(node, node.name)
return node
def visitFor(self, node, walker):
# convert
# for x in expr:
# to
# for x in _getiter(expr):
#
# Note that visitListCompFor is the same thing. Exactly the same
# transformation is needed to convert
# [... for x in expr ...]
# to
# [... for x in _getiter(expr) ...]
node = walker.defaultVisitNode(node)
node.list = ast.CallFunc(_getiter_name, [node.list])
return node
visitListCompFor = visitFor
def visitGetattr(self, node, walker):
"""Converts attribute access to a function call.
......@@ -250,21 +255,13 @@ class RestrictionMutator:
"""
self.checkAttrName(node)
node = walker.defaultVisitNode(node)
if getattr(node, 'in_aug_assign', 0):
if getattr(node, 'in_aug_assign', False):
# We're in an augmented assignment
# We might support this later...
self.error(node, 'Augmented assignment of '
'attributes is not allowed.')
#expr.append(_write_guard_name)
#self.funcinfo._write_used = 1
self.funcinfo._getattr_used = 1
if self.funcinfo._is_suite:
# Use the local function _getattr().
ga = _getattr_name
else:
# Use the global function _getattr_().
ga = _getattr_name_expr
return ast.CallFunc(ga, [node.expr, ast.Const(node.attrname)])
return ast.CallFunc(_getattr_name,
[node.expr, ast.Const(node.attrname)])
def visitSubscript(self, node, walker):
"""Checks all kinds of subscripts.
......@@ -283,14 +280,11 @@ class RestrictionMutator:
node = walker.defaultVisitNode(node)
if node.flags == OP_APPLY:
# Set 'subs' to the node that represents the subscript or slice.
if getattr(node, 'in_aug_assign', 0):
if getattr(node, 'in_aug_assign', False):
# We're in an augmented assignment
# We might support this later...
self.error(node, 'Augmented assignment of '
'object items and slices is not allowed.')
#expr.append(_write_guard_name)
#self.funcinfo._write_used = 1
self.funcinfo._getitem_used = 1
if hasattr(node, 'subs'):
# Subscript.
subs = node.subs
......@@ -310,15 +304,10 @@ class RestrictionMutator:
if upper is None:
upper = _None_const
subs = ast.Sliceobj([lower, upper])
if self.funcinfo._is_suite:
gi = _getitem_name
else:
gi = _getitem_name_expr
return ast.CallFunc(gi, [node.expr, subs])
return ast.CallFunc(_getitem_name, [node.expr, subs])
elif node.flags in (OP_DELETE, OP_ASSIGN):
# set or remove subscript or slice
node.expr = ast.CallFunc(_write_guard_name, [node.expr])
self.funcinfo._write_used = 1
node.expr = ast.CallFunc(_write_name, [node.expr])
return node
visitSlice = visitSubscript
......@@ -331,8 +320,7 @@ class RestrictionMutator:
"""
self.checkAttrName(node)
node = walker.defaultVisitNode(node)
node.expr = ast.CallFunc(_write_guard_name, [node.expr])
self.funcinfo._write_used = 1
node.expr = ast.CallFunc(_write_name, [node.expr])
return node
def visitExec(self, node, walker):
......@@ -357,7 +345,6 @@ class RestrictionMutator:
Zope doesn't make use of this. The body of Python scripts is
always at function scope.
"""
self.funcinfo._is_suite = 1
node = walker.defaultVisitNode(node)
self.prepBody(node.node.nodes)
return node
......@@ -372,12 +359,11 @@ class RestrictionMutator:
This could be a problem if untrusted code got access to a
mutable database object that supports augmented assignment.
"""
node.node.in_aug_assign = 1
node.node.in_aug_assign = True
return walker.defaultVisitNode(node)
def visitImport(self, node, walker):
"""Checks names imported using checkName().
"""
"""Checks names imported using checkName()."""
for name, asname in node.names:
self.checkName(node, name)
if asname:
......
......@@ -10,12 +10,10 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
'''
Compiler selector.
$Id: SelectCompiler.py,v 1.5 2003/11/06 17:11:49 shane Exp $
'''
"""Compiler selector.
import sys
$Id: SelectCompiler.py,v 1.6 2004/01/15 23:09:09 tseaver Exp $
"""
# Use the compiler from the standard library.
import compiler
......
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Restricted Python transformation examples
This module contains pairs of functions. Each pair has a before and an
after function. The after function shows the source code equivalent
of the before function after it has been modified by the restricted
compiler.
These examples are actually used in the testRestrictions.py
checkBeforeAndAfter() unit tests, which verifies that the restricted compiler
actually produces the same output as would be output by the normal compiler
for the after function.
$Id: before_and_after.py,v 1.2 2004/01/15 23:09:11 tseaver Exp $
"""
# getattr
def simple_getattr_before(x):
return x.y
def simple_getattr_after(x):
return _getattr_(x, 'y')
# set attr
def simple_setattr_before():
x.y = "bar"
def simple_setattr_after():
_write_(x).y = "bar"
# for loop and list comprehensions
def simple_forloop_before(x):
for x in [1, 2, 3]:
pass
def simple_forloop_after(x):
for x in _getiter_([1, 2, 3]):
pass
def nested_forloop_before(x):
for x in [1, 2, 3]:
for y in "abc":
pass
def nested_forloop_after(x):
for x in _getiter_([1, 2, 3]):
for y in _getiter_("abc"):
pass
def simple_list_comprehension_before():
x = [y**2 for y in whatever if y > 3]
def simple_list_comprehension_after():
x = [y**2 for y in _getiter_(whatever) if y > 3]
def nested_list_comprehension_before():
x = [x**2 + y**2 for x in whatever1 if x >= 0
for y in whatever2 if y >= x]
def nested_list_comprehension_after():
x = [x**2 + y**2 for x in _getiter_(whatever1) if x >= 0
for y in _getiter_(whatever2) if y >= x]
# print
def simple_print_before():
print "foo"
def simple_print_after():
_print = _print_()
print >> _print, "foo"
# getitem
def simple_getitem_before():
return x[0]
def simple_getitem_after():
return _getitem_(x, 0)
def simple_get_tuple_key_before():
x = y[1,2]
def simple_get_tuple_key_after():
x = _getitem_(y, (1,2))
# set item
def simple_setitem_before():
x[0] = "bar"
def simple_setitem_after():
_write_(x)[0] = "bar"
# delitem
def simple_delitem_before():
del x[0]
def simple_delitem_after():
del _write_(x)[0]
# a collection of function parallels to many of the above
def function_with_print_before():
def foo():
print "foo"
return printed
def function_with_print_after():
def foo():
_print = _print_()
print >> _print, "foo"
return _print()
def function_with_getattr_before():
def foo():
return x.y
def function_with_getattr_after():
def foo():
return _getattr_(x, 'y')
def function_with_setattr_before():
def foo(x):
x.y = "bar"
def function_with_setattr_after():
def foo(x):
_write_(x).y = "bar"
def function_with_getitem_before():
def foo(x):
return x[0]
def function_with_getitem_after():
def foo(x):
return _getitem_(x, 0)
def function_with_forloop_before():
def foo():
for x in [1, 2, 3]:
pass
def function_with_forloop_after():
def foo():
for x in _getiter_([1, 2, 3]):
pass
# this, and all slices, won't work in these tests because the before code
# parses the slice as a slice object, while the after code can't generate a
# slice object in this way. The after code as written below
# is parsed as a call to the 'slice' name, not as a slice object.
# XXX solutions?
#def simple_slice_before():
# x = y[:4]
#def simple_slice_after():
# _getitem = _getitem_
# x = _getitem(y, slice(None, 4))
# Assignment stmts in Python can be very complicated. The "no_unpack"
# test makes sure we're not doing unnecessary rewriting.
def no_unpack_before():
x = y
x = [y]
x = y,
x = (y, (y, y), [y, (y,)], x, (x, y))
x = y = z = (x, y, z)
no_unpack_after = no_unpack_before # that is, should be untouched
# apply() variations. Native apply() is unsafe because, e.g.,
#
# def f(a, b, c):
# whatever
#
# apply(f, two_element_sequence, dict_with_key_c)
#
# or (different spelling of the same thing)
#
# f(*two_element_sequence, **dict_with_key_c)
#
# makes the elements of two_element_sequence visible to f via its 'a' and
# 'b' arguments, and the dict_with_key_c['c'] value visible via its 'c'
# argument. That is, it's a devious way to extract values without going
# thru security checks.
def star_call_before():
foo(*a)
def star_call_after():
_apply_(foo, *a)
def star_call_2_before():
foo(0, *a)
def star_call_2_after():
_apply_(foo, 0, *a)
def starstar_call_before():
foo(**d)
def starstar_call_after():
_apply_(foo, **d)
def star_and_starstar_call_before():
foo(*a, **d)
def star_and_starstar_call_after():
_apply_(foo, *a, **d)
def positional_and_star_and_starstar_call_before():
foo(b, *a, **d)
def positional_and_star_and_starstar_call_after():
_apply_(foo, b, *a, **d)
def positional_and_defaults_and_star_and_starstar_call_before():
foo(b, x=y, w=z, *a, **d)
def positional_and_defaults_and_star_and_starstar_call_after():
_apply_(foo, b, x=y, w=z, *a, **d)
def lambda_with_getattr_in_defaults_before():
f = lambda x=y.z: x
def lambda_with_getattr_in_defaults_after():
f = lambda x=_getattr_(y, "z"): x
class MyClass:
def set(self, val):
self.state = val
def get(self):
return self.state
x = MyClass()
x.set(12)
x.set(x.get() + 1)
if x.get() != 13:
raise AssertionError, "expected 13, got %d" % x.get()
f = lambda x, y=1: x + y
if f(2) != 3:
raise ValueError
if f(2, 2) != 4:
raise ValueError
from __future__ import nested_scopes
import sys
def print0():
print 'Hello, world!',
......@@ -27,6 +28,18 @@ def printLines():
print
return printed
def try_map():
inc = lambda i: i+1
x = [1, 2, 3]
print map(inc, x),
return printed
def try_apply():
def f(x, y, z):
return x + y + z
print f(*(300, 20), **{'z': 1}),
return printed
def primes():
# Somewhat obfuscated code on purpose
print filter(None,map(lambda y:y*reduce(lambda x,y:x*y!=0,
......@@ -57,6 +70,9 @@ def allowed_simple():
s = 'a'
s = s[:100] + 'b'
s += 'c'
if sys.version_info >= (2, 3):
t = ['l', 'm', 'n', 'o', 'p', 'q']
t[1:5:2] = ['n', 'p']
_ = q
return q['x'] + q['y'] + q['z'] + r[0] + r[1] + r[2] + s
......@@ -154,3 +170,7 @@ def nested_scopes_1():
def f2():
return a
return f1() + f2()
class Classic:
pass
......@@ -50,3 +50,7 @@ def except_using_bad_name():
# The name of choice (say, _write) is now assigned to an exception
# object. Hard to exploit, but conceivable.
pass
def keyword_arg_with_bad_name():
def f(okname=1, __badname=2):
pass
from string import rfind
import sys, os
import os
import re
import sys
import unittest
# Note that nothing should be imported from AccessControl, and in particular
# nothing from ZopeGuards.py. Transformed code may need several wrappers
# in order to run at all, and most of the production wrappers are defined
# in ZopeGuards. But RestrictedPython isn't supposed to depend on
# AccessControl, so we need to define throwaway wrapper implementations
# here instead.
from RestrictedPython import compile_restricted, PrintCollector
from RestrictedPython.Eval import RestrictionCapableEval
from RestrictedPython.tests import restricted_module, security_in_syntax
from types import FunctionType
from RestrictedPython.tests import before_and_after, restricted_module, verify
from RestrictedPython.RCompile import RModule
if __name__=='__main__':
here = os.getcwd()
else:
here = os.path.dirname(__file__)
if not here:
here = os.getcwd()
try:
__file__
except NameError:
__file__ = os.path.abspath(sys.argv[1])
_FILEPATH = os.path.abspath( __file__ )
_HERE = os.path.dirname( _FILEPATH )
def _getindent(line):
"""Returns the indentation level of the given line."""
......@@ -42,9 +50,14 @@ def find_source(fn, func):
f.close()
return fn, msg
def get_source(func):
"""Less silly interface to find_source""" # Sheesh
code = func.func_code
return find_source(code.co_filename, code)[1]
def create_rmodule():
global rmodule
fn = os.path.join(here, 'restricted_module.py')
fn = os.path.join(_HERE, 'restricted_module.py')
f = open(fn, 'r')
source = f.read()
f.close()
......@@ -52,7 +65,8 @@ def create_rmodule():
compile(source, fn, 'exec')
# Now compile it for real
code = compile_restricted(source, fn, 'exec')
rmodule = {'__builtins__':{'__import__':__import__, 'None':None}}
rmodule = {'__builtins__':{'__import__':__import__, 'None':None,
'__name__': 'restricted_module'}}
builtins = getattr(__builtins__, '__dict__', __builtins__)
for name in ('map', 'reduce', 'int', 'pow', 'range', 'filter',
'len', 'chr', 'ord',
......@@ -60,8 +74,6 @@ def create_rmodule():
rmodule[name] = builtins[name]
exec code in rmodule
create_rmodule()
class AccessDenied (Exception): pass
DisallowedObject = []
......@@ -158,20 +170,35 @@ class TestGuard:
_ob = self.__dict__['_ob']
_ob[lo:hi] = value
# A wrapper for _apply_.
apply_wrapper_called = []
def apply_wrapper(func, *args, **kws):
apply_wrapper_called.append('yes')
return func(*args, **kws)
class RestrictionTests(unittest.TestCase):
def execFunc(self, name, *args, **kw):
func = rmodule[name]
verify.verify(func.func_code)
func.func_globals.update({'_getattr_': guarded_getattr,
'_getitem_': guarded_getitem,
'_write_': TestGuard,
'_print_': PrintCollector})
'_print_': PrintCollector,
# I don't want to write something as involved as ZopeGuard's
# SafeIter just for these tests. Using the builtin list() function
# worked OK for everything the tests did at the time this was added,
# but may fail in the future. If Python 2.1 is no longer an
# interesting platform then, using 2.2's builtin iter() here should
# work for everything.
'_getiter_': list,
'_apply_': apply_wrapper,
})
return func(*args, **kw)
def checkPrint(self):
for i in range(2):
res = self.execFunc('print%s' % i)
assert res == 'Hello, world!', res
self.assertEqual(res, 'Hello, world!')
def checkPrintToNone(self):
try:
......@@ -180,23 +207,23 @@ class RestrictionTests(unittest.TestCase):
# Passed. "None" has no "write" attribute.
pass
else:
assert 0, res
self.fail(0, res)
def checkPrintStuff(self):
res = self.execFunc('printStuff')
assert res == 'a b c', res
self.assertEqual(res, 'a b c')
def checkPrintLines(self):
res = self.execFunc('printLines')
assert res == '0 1 2\n3 4 5\n6 7 8\n', res
self.assertEqual(res, '0 1 2\n3 4 5\n6 7 8\n')
def checkPrimes(self):
res = self.execFunc('primes')
assert res == '[2, 3, 5, 7, 11, 13, 17, 19]', res
self.assertEqual(res, '[2, 3, 5, 7, 11, 13, 17, 19]')
def checkAllowedSimple(self):
res = self.execFunc('allowed_simple')
assert res == 'abcabcabc', res
self.assertEqual(res, 'abcabcabc')
def checkAllowedRead(self):
self.execFunc('allowed_read', RestrictedObject())
......@@ -207,6 +234,16 @@ class RestrictionTests(unittest.TestCase):
def checkAllowedArgs(self):
self.execFunc('allowed_default_args', RestrictedObject())
def checkTryMap(self):
res = self.execFunc('try_map')
self.assertEqual(res, "[2, 3, 4]")
def checkApply(self):
del apply_wrapper_called[:]
res = self.execFunc('try_apply')
self.assertEqual(apply_wrapper_called, ["yes"])
self.assertEqual(res, "321")
def checkDenied(self):
for k in rmodule.keys():
if k[:6] == 'denied':
......@@ -216,12 +253,12 @@ class RestrictionTests(unittest.TestCase):
# Passed the test
pass
else:
raise AssertionError, '%s() did not trip security' % k
self.fail('%s() did not trip security' % k)
def checkSyntaxSecurity(self):
# Ensures that each of the functions in security_in_syntax.py
# throws a SyntaxError when using compile_restricted.
fn = os.path.join(here, 'security_in_syntax.py')
fn = os.path.join(_HERE, 'security_in_syntax.py')
f = open(fn, 'r')
source = f.read()
f.close()
......@@ -240,19 +277,19 @@ class RestrictionTests(unittest.TestCase):
# Passed the test.
pass
else:
raise AssertionError, '%s should not have compiled' % k
self.fail('%s should not have compiled' % k)
def checkOrderOfOperations(self):
res = self.execFunc('order_of_operations')
assert (res == 0), res
self.assertEqual(res, 0)
def checkRot13(self):
res = self.execFunc('rot13', 'Zope is k00l')
assert (res == 'Mbcr vf x00y'), res
self.assertEqual(res, 'Mbcr vf x00y')
def checkNestedScopes1(self):
res = self.execFunc('nested_scopes_1')
assert (res == 2), res
self.assertEqual(res, 2)
def checkUnrestrictedEval(self):
expr = RestrictionCapableEval("{'a':[m.pop()]}['a'] + [m[0]]")
......@@ -260,10 +297,10 @@ class RestrictionTests(unittest.TestCase):
expect = v[:]
expect.reverse()
res = expr.eval({'m':v})
assert res == expect, res
self.assertEqual(res, expect)
v = [12, 34]
res = expr(m=v)
assert res == expect
self.assertEqual(res, expect)
def checkStackSize(self):
for k, rfunc in rmodule.items():
......@@ -275,23 +312,145 @@ class RestrictionTests(unittest.TestCase):
'should have been at least %d, but was only %d'
% (k, ss, rss))
def test_suite():
return unittest.makeSuite(RestrictionTests, 'check')
def main():
alltests=test_suite()
runner = unittest.TextTestRunner()
runner.run(alltests)
def checkBeforeAndAfter(self):
from RestrictedPython.RCompile import RModule
from compiler import parse
defre = re.compile(r'def ([_A-Za-z0-9]+)_(after|before)\(')
def debug():
test_suite().debug()
beforel = [name for name in before_and_after.__dict__
if name.endswith("_before")]
def pdebug():
import pdb
pdb.run('debug()')
for name in beforel:
before = getattr(before_and_after, name)
before_src = get_source(before)
before_src = re.sub(defre, r'def \1(', before_src)
rm = RModule(before_src, '')
tree_before = rm._get_tree()
after = getattr(before_and_after, name[:-6]+'after')
after_src = get_source(after)
after_src = re.sub(defre, r'def \1(', after_src)
tree_after = parse(after_src)
self.assertEqual(str(tree_before), str(tree_after))
rm.compile()
verify.verify(rm.getCode())
def _compile_file(self, name):
path = os.path.join(_HERE, name)
f = open(path, "r")
source = f.read()
f.close()
co = compile_restricted(source, path, "exec")
verify.verify(co)
return co
def checkUnpackSequence(self):
co = self._compile_file("unpack.py")
calls = []
def getiter(seq):
calls.append(seq)
return list(seq)
globals = {"_getiter_": getiter}
exec co in globals, {}
# The comparison here depends on the exact code that is
# contained in unpack.py.
# The test doing implicit unpacking in an "except:" clause is
# a pain, because there are two levels of unpacking, and the top
# level is unpacking the specific TypeError instance constructed
# by the test. We have to worm around that one.
ineffable = "a TypeError instance"
expected = [[1, 2],
(1, 2),
"12",
[1],
[1, [2, 3], 4],
[2, 3],
(1, (2, 3), 4),
(2, 3),
[1, 2, 3],
2,
('a', 'b'),
((1, 2), (3, 4)), (1, 2),
((1, 2), (3, 4)), (3, 4),
ineffable, [42, 666],
[[0, 1], [2, 3], [4, 5]], [0, 1], [2, 3], [4, 5],
([[[1, 2]]], [[[3, 4]]]), [[[1, 2]]], [[1, 2]], [1, 2],
[[[3, 4]]], [[3, 4]], [3, 4],
]
i = expected.index(ineffable)
self.assert_(isinstance(calls[i], TypeError))
expected[i] = calls[i]
self.assertEqual(calls, expected)
def checkUnpackSequenceExpression(self):
co = compile_restricted("[x for x, y in [(1, 2)]]", "<string>", "eval")
verify.verify(co)
calls = []
def getiter(s):
calls.append(s)
return list(s)
globals = {"_getiter_": getiter}
exec co in globals, {}
self.assertEqual(calls, [[(1,2)], (1, 2)])
def checkUnpackSequenceSingle(self):
co = compile_restricted("x, y = 1, 2", "<string>", "single")
verify.verify(co)
calls = []
def getiter(s):
calls.append(s)
return list(s)
globals = {"_getiter_": getiter}
exec co in globals, {}
self.assertEqual(calls, [(1, 2)])
def checkClass(self):
getattr_calls = []
setattr_calls = []
def test_getattr(obj, attr):
getattr_calls.append(attr)
return getattr(obj, attr)
def test_setattr(obj):
setattr_calls.append(obj.__class__.__name__)
return obj
co = self._compile_file("class.py")
globals = {"_getattr_": test_getattr,
"_write_": test_setattr,
}
exec co in globals, {}
# Note that the getattr calls don't correspond to the method call
# order, because the x.set method is fetched before its arguments
# are evaluated.
self.assertEqual(getattr_calls,
["set", "set", "get", "state", "get", "state"])
self.assertEqual(setattr_calls, ["MyClass", "MyClass"])
def checkLambda(self):
co = self._compile_file("lambda.py")
exec co in {}, {}
def checkSyntaxError(self):
err = ("def f(x, y):\n"
" if x, y < 2 + 1:\n"
" return x + y\n"
" else:\n"
" return x - y\n")
self.assertRaises(SyntaxError,
compile_restricted, err, "<string>", "exec")
create_rmodule()
def test_suite():
return unittest.makeSuite(RestrictionTests, 'check')
if __name__=='__main__':
if len(sys.argv) > 1:
globals()[sys.argv[1]]()
else:
main()
unittest.main(defaultTest="test_suite")
# A series of short tests for unpacking sequences.
def u1(L):
x, y = L
assert x == 1
assert y == 2
u1([1,2])
u1((1, 2))
def u1a(L):
x, y = L
assert x == '1'
assert y == '2'
u1a("12")
try:
u1([1])
except ValueError:
pass
else:
raise AssertionError, "expected 'unpack list of wrong size'"
def u2(L):
x, (a, b), y = L
assert x == 1
assert a == 2
assert b == 3
assert y == 4
u2([1, [2, 3], 4])
u2((1, (2, 3), 4))
try:
u2([1, 2, 3])
except TypeError:
pass
else:
raise AssertionError, "expected 'iteration over non-sequence'"
def u3((x, y)):
assert x == 'a'
assert y == 'b'
return x, y
u3(('a', 'b'))
def u4(x):
(a, b), c = d, (e, f) = x
assert a == 1 and b == 2 and c == (3, 4)
assert d == (1, 2) and e == 3 and f == 4
u4( ((1, 2), (3, 4)) )
def u5(x):
try:
raise TypeError(x)
# This one is tricky to test, because the first level of unpacking
# has a TypeError instance. That's a headache for the test driver.
except TypeError, [(a, b)]:
assert a == 42
assert b == 666
u5([42, 666])
def u6(x):
expected = 0
for i, j in x:
assert i == expected
expected += 1
assert j == expected
expected += 1
u6([[0, 1], [2, 3], [4, 5]])
def u7(x):
stuff = [i + j for toplevel, in x for i, j in toplevel]
assert stuff == [3, 7]
u7( ([[[1, 2]]], [[[3, 4]]]) )
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Verify simple properties of bytecode.
Some of the transformations performed by the RestrictionMutator are
tricky. This module checks the generated bytecode as a way to verify
the correctness of the transformations. Violations of some
restrictions are obvious from inspection of the bytecode. For
example, the bytecode should never contain a LOAD_ATTR call, because
all attribute access is performed via the _getattr_() checker
function.
"""
import dis
import types
def verify(code):
"""Verify all code objects reachable from code.
In particular, traverse into contained code objects in the
co_consts table.
"""
verifycode(code)
for ob in code.co_consts:
if isinstance(ob, types.CodeType):
verify(ob)
def verifycode(code):
try:
_verifycode(code)
except:
dis.dis(code)
raise
def _verifycode(code):
line = code.co_firstlineno
# keep a window of the last three opcodes, with the most recent first
window = (None, None, None)
for op in disassemble(code):
if op.line is not None:
line = op.line
if op.opname.endswith("LOAD_ATTR"):
# All the user code that generates LOAD_ATTR should be
# rewritten, but the code generated for a list comp
# includes a LOAD_ATTR to extract the append method.
if not (op.arg == "append" and
window[0].opname == "DUP_TOP" and
window[1].opname == "BUILD_LIST"):
raise ValueError("direct attribute access %s: %s, %s:%d"
% (op.opname, op.arg, co.co_filename, line))
if op.opname in ("STORE_ATTR", "DEL_ATTR"):
if not (window[0].opname == "CALL_FUNCTION" and
window[2].opname == "LOAD_GLOBAL" and
window[2].arg == "_write_"):
# check that arg is appropriately wrapped
for i, op in enumerate(window):
print i, op.opname, op.arg
raise ValueError("unguard attribute set/del at %s:%d"
% (code.co_filename, line))
if op.opname.startswith("UNPACK"):
# An UNPACK opcode extracts items from iterables, and that's
# unsafe. The restricted compiler doesn't remove UNPACK opcodes,
# but rather *inserts* a call to _getiter_() before each, and
# that's the pattern we need to see.
if not (window[0].opname == "CALL_FUNCTION" and
window[1].opname == "ROT_TWO" and
window[2].opname == "LOAD_GLOBAL" and
window[2].arg == "_getiter_"):
raise ValueError("unguarded unpack sequence at %s:%d" %
(code.co_filename, line))
# should check CALL_FUNCTION_{VAR,KW,VAR_KW} but that would
# require a potentially unlimited history. need to refactor
# the "window" before I can do that.
if op.opname == "LOAD_SUBSCR":
raise ValueError("unguarded index of sequence at %s:%d" %
(code.co_filename, line))
window = (op,) + window[:2]
class Op(object):
__slots__ = (
"opname", # string, name of the opcode
"argcode", # int, the number of the argument
"arg", # any, the object, name, or value of argcode
"line", # int, line number or None
"target", # boolean, is this op the target of a jump
"pos", # int, offset in the bytecode
)
def __init__(self, opcode, pos):
self.opname = dis.opname[opcode]
self.arg = None
self.line = None
self.target = False
self.pos = pos
def disassemble(co, lasti=-1):
code = co.co_code
labels = dis.findlabels(code)
linestarts = dict(findlinestarts(co))
n = len(code)
i = 0
extended_arg = 0
free = co.co_cellvars + co.co_freevars
while i < n:
op = ord(code[i])
o = Op(op, i)
i += 1
if i in linestarts and i > 0:
o.line = linestarts[i]
if i in labels:
o.target = True
if op > dis.HAVE_ARGUMENT:
arg = ord(code[i]) + ord(code[i+1]) * 256 + extended_arg
extended_arg = 0
i += 2
if op == dis.EXTENDED_ARG:
extended_arg = arg << 16
o.argcode = arg
if op in dis.hasconst:
o.arg = co.co_consts[arg]
elif op in dis.hasname:
o.arg = co.co_names[arg]
elif op in dis.hasjrel:
o.arg = i + arg
elif op in dis.haslocal:
o.arg = co.co_varnames[arg]
elif op in dis.hascompare:
o.arg = dis.cmp_op[arg]
elif op in dis.hasfree:
o.arg = free[arg]
yield o
# findlinestarts is copied from Python 2.4's dis module. The code
# didn't exist in 2.3, but it would be painful to code disassemble()
# without it.
def findlinestarts(code):
"""Find the offsets in a byte code which are start of lines in the source.
Generate pairs (offset, lineno) as described in Python/compile.c.
"""
byte_increments = [ord(c) for c in code.co_lnotab[0::2]]
line_increments = [ord(c) for c in code.co_lnotab[1::2]]
lastlineno = None
lineno = code.co_firstlineno
addr = 0
for byte_incr, line_incr in zip(byte_increments, line_increments):
if byte_incr:
if lineno != lastlineno:
yield (addr, lineno)
lastlineno = lineno
addr += byte_incr
lineno += line_incr
if lineno != lastlineno:
yield (addr, lineno)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment