Commit 7f8def24 authored by Casey Duncan's avatar Casey Duncan

Refactor unrestrictedTraverse

- Reformat code, add docstrings, unobfuscate local var names
- Eliminate use of hasattr and simplify where possible
- Add comments
- Improve test coverage
parent d62a0d18
...@@ -23,7 +23,7 @@ from urllib import quote ...@@ -23,7 +23,7 @@ from urllib import quote
NotFound = 'NotFound' NotFound = 'NotFound'
_marker=[] _marker = object()
class Traversable: class Traversable:
...@@ -106,100 +106,130 @@ class Traversable: ...@@ -106,100 +106,130 @@ class Traversable:
unrestrictedTraverse__roles__=() # Private unrestrictedTraverse__roles__=() # Private
def unrestrictedTraverse(self, path, default=_marker, restricted=0): def unrestrictedTraverse(self, path, default=_marker, restricted=0):
"""Lookup an object by path,
path -- The path to the object. May be a sequence of strings or a slash
separated string. If the path begins with an empty path element
(i.e., an empty string or a slash) then the lookup is performed
from the application root. Otherwise, the lookup is relative to
self. Two dots (..) as a path element indicates an upward traversal
to the acquisition parent.
default -- If provided, this is the value returned if the path cannot
be traversed for any reason (i.e., no object exists at that path or
the object is inaccessible).
restricted -- If false (default) then no security checking is performed.
If true, then all of the objects along the path are validated with
the security machinery. Usually invoked using restrictedTraverse().
"""
if not path:
return self
if not path: return self _getattr = getattr
_hasattr = hasattr
get=getattr _none = None
has=hasattr marker = _marker
N=None
M=_marker
if isinstance(path, str): path = path.split('/') if isinstance(path, str):
else: path=list(path) # Unicode paths are not allowed
path = path.split('/')
else:
path = list(path)
REQUEST={'TraversalRequestNameStack': path} REQUEST = {'TraversalRequestNameStack': path}
path.reverse() path.reverse()
pop=path.pop path_pop=path.pop
if len(path) > 1 and not path[0]: if len(path) > 1 and not path[0]:
# Remove trailing slash # Remove trailing slash
path.pop(0) path.pop(0)
if restricted: securityManager=getSecurityManager() if restricted:
else: securityManager=N securityManager = getSecurityManager()
else:
securityManager = _none
if not path[-1]: if not path[-1]:
# If the path starts with an empty string, go to the root first. # If the path starts with an empty string, go to the root first.
pop() path_pop()
self=self.getPhysicalRoot() self = self.getPhysicalRoot()
if (restricted and not securityManager.validate( if (restricted
None, None, None, self)): and not securityManager.validate(None, None, None, self)):
raise Unauthorized, name raise Unauthorized, name
try: try:
object = self obj = self
while path: while path:
name=pop() name = path_pop()
__traceback_info__ = path, name __traceback_info__ = path, name
if name[0] == '_': if name[0] == '_':
# Never allowed in a URL. # Never allowed in a URL.
raise NotFound, name raise NotFound, name
if name=='..': if name == '..':
o=getattr(object, 'aq_parent', M) next = aq_parent(obj)
if o is not M: if next is not _none:
if (restricted and not securityManager.validate( if restricted and not securityManager.validate(
object, object,name, o)): obj, obj,name, next):
raise Unauthorized, name raise Unauthorized, name
object=o obj = next
continue continue
t=get(object, '__bobo_traverse__', N) bobo_traverse = _getattr(obj, '__bobo_traverse__', _none)
if t is not N: if bobo_traverse is not _none:
o=t(REQUEST, name) next = bobo_traverse(REQUEST, name)
if restricted: if restricted:
container = N if aq_base(next) is not next:
if aq_base(o) is not o:
# The object is wrapped, so the acquisition # The object is wrapped, so the acquisition
# context determines the container. # context is the container.
container = aq_parent(aq_inner(o)) container = aq_parent(aq_inner(next))
elif has(o, 'im_self'): elif _getattr(next, 'im_self', _none) is not _none:
container = o.im_self # Bound method, the bound instance
elif (has(get(object, 'aq_base', object), name) # is the container
and get(object, name) == o): container = next.im_self
container = object elif _getattr(aq_base(obj), name, marker) == next:
if (not securityManager.validate(object, # Unwrapped direct attribute of the object so
container, name, o)): # object is the container
container = obj
else:
# Can't determine container
container = _none
if not securityManager.validate(
obj, container, name, next):
raise Unauthorized, name raise Unauthorized, name
else: else:
if restricted: if restricted:
o = guarded_getattr(object, name, M) next = guarded_getattr(obj, name, marker)
else: else:
o = get(object, name, M) next = _getattr(obj, name, marker)
if o is M: if next is marker:
try: try:
o=object[name] next=obj[name]
except AttributeError: except AttributeError:
# Raise NotFound for easier debugging # Raise NotFound for easier debugging
# instead of AttributeError: __getitem__
raise NotFound, name raise NotFound, name
if (restricted and not securityManager.validate( if restricted and not securityManager.validate(
object, object, N, o)): obj, obj, _none, next):
raise Unauthorized, name raise Unauthorized, name
object=o obj = next
return object return obj
except ConflictError: raise except ConflictError:
raise
except: except:
if default==_marker: raise if default is not marker:
return default return default
else:
raise
restrictedTraverse__roles__=None # Public restrictedTraverse__roles__=None # Public
def restrictedTraverse(self, path, default=_marker): def restrictedTraverse(self, path, default=_marker):
# Trusted code traversal code, always enforces security
return self.unrestrictedTraverse(path, default, restricted=1) return self.unrestrictedTraverse(path, default, restricted=1)
def path2url(path): def path2url(path):
......
...@@ -84,9 +84,21 @@ class BoboTraversable(SimpleItem): ...@@ -84,9 +84,21 @@ class BoboTraversable(SimpleItem):
def __bobo_traverse__(self, request, name): def __bobo_traverse__(self, request, name):
if name == 'bb_subitem': if name == 'bb_subitem':
return BoboTraversable().__of__(self) return BoboTraversable().__of__(self)
elif name == 'bb_method':
return self.bb_method
elif name == 'bb_status':
return self.bb_status
elif name == 'manufactured':
return 42
else: else:
raise KeyError raise KeyError
def bb_method(self):
"""Test Method"""
pass
bb_status = 'screechy'
def makeConnection(): def makeConnection():
import ZODB import ZODB
...@@ -152,13 +164,17 @@ class TestTraverse( unittest.TestCase ): ...@@ -152,13 +164,17 @@ class TestTraverse( unittest.TestCase ):
def testTraversePath( self ): def testTraversePath( self ):
self.failUnless( 'file' in self.folder1.objectIds() ) self.failUnless( 'file' in self.folder1.objectIds() )
self.failUnless( self.folder1.unrestrictedTraverse( ('', 'folder1', 'file' ) )) self.failUnless(
self.failUnless( self.folder1.unrestrictedTraverse( ('', 'folder1' ) )) self.folder1.unrestrictedTraverse( ('', 'folder1', 'file' ) ))
self.failUnless(
self.folder1.unrestrictedTraverse( ('', 'folder1' ) ))
def testTraverseURLNoSlash( self ): def testTraverseURLNoSlash( self ):
self.failUnless( 'file' in self.folder1.objectIds() ) self.failUnless( 'file' in self.folder1.objectIds() )
self.failUnless( self.folder1.unrestrictedTraverse( '/folder1/file' )) self.failUnless(
self.failUnless( self.folder1.unrestrictedTraverse( '/folder1' )) self.folder1.unrestrictedTraverse( '/folder1/file' ))
self.failUnless(
self.folder1.unrestrictedTraverse( '/folder1' ))
def testTraverseURLSlash( self ): def testTraverseURLSlash( self ):
self.failUnless( 'file' in self.folder1.objectIds() ) self.failUnless( 'file' in self.folder1.objectIds() )
...@@ -166,11 +182,15 @@ class TestTraverse( unittest.TestCase ): ...@@ -166,11 +182,15 @@ class TestTraverse( unittest.TestCase ):
self.failUnless( self.folder1.unrestrictedTraverse( '/folder1/' )) self.failUnless( self.folder1.unrestrictedTraverse( '/folder1/' ))
def testTraverseToNone( self ): def testTraverseToNone( self ):
self.failUnlessRaises( KeyError, self.folder1.unrestrictedTraverse, ('', 'folder1', 'file2' ) ) self.failUnlessRaises(
self.failUnlessRaises( KeyError, self.folder1.unrestrictedTraverse, '/folder1/file2' ) KeyError,
self.failUnlessRaises( KeyError, self.folder1.unrestrictedTraverse, '/folder1/file2/' ) self.folder1.unrestrictedTraverse, ('', 'folder1', 'file2' ) )
self.failUnlessRaises(
def testTraverseThroughBoboTraverse(self): KeyError, self.folder1.unrestrictedTraverse, '/folder1/file2' )
self.failUnlessRaises(
KeyError, self.folder1.unrestrictedTraverse, '/folder1/file2/' )
def testBoboTraverseToWrappedSubObj(self):
# Verify it's possible to use __bobo_traverse__ with the # Verify it's possible to use __bobo_traverse__ with the
# Zope security policy. # Zope security policy.
noSecurityManager() noSecurityManager()
...@@ -179,6 +199,33 @@ class TestTraverse( unittest.TestCase ): ...@@ -179,6 +199,33 @@ class TestTraverse( unittest.TestCase ):
self.failUnlessRaises(KeyError, bb.restrictedTraverse, 'notfound') self.failUnlessRaises(KeyError, bb.restrictedTraverse, 'notfound')
bb.restrictedTraverse('bb_subitem') bb.restrictedTraverse('bb_subitem')
def testBoboTraverseToMethod(self):
# Verify it's possible to use __bobo_traverse__ to a method.
noSecurityManager()
SecurityManager.setSecurityPolicy( self.oldPolicy )
bb = BoboTraversable()
self.failUnless(
bb.restrictedTraverse('bb_method') is not bb.bb_method)
def testBoboTraverseToSimpleAttrValue(self):
# Verify it's possible to use __bobo_traverse__ to a simple
# python value
noSecurityManager()
SecurityManager.setSecurityPolicy( self.oldPolicy )
bb = BoboTraversable()
self.assertEqual(bb.restrictedTraverse('bb_status'), 'screechy')
def testBoboTraverseToNonAttrValue(self):
# Verify it's possible to use __bobo_traverse__ to an
# arbitrary manufactured object
noSecurityManager()
# Default security policy always seems to deny in this case, which
# is fine, but to test the code branch we sub in the forgiving one
SecurityManager.setSecurityPolicy(UnitTestSecurityPolicy())
bb = BoboTraversable()
self.failUnless(
bb.restrictedTraverse('manufactured') is 42)
def testAcquiredAttributeDenial(self): def testAcquiredAttributeDenial(self):
# Verify that restrictedTraverse raises the right kind of exception # Verify that restrictedTraverse raises the right kind of exception
# on denial of access to an acquired attribute. If it raises # on denial of access to an acquired attribute. If it raises
...@@ -191,6 +238,24 @@ class TestTraverse( unittest.TestCase ): ...@@ -191,6 +238,24 @@ class TestTraverse( unittest.TestCase ):
self.failUnlessRaises(Unauthorized, self.failUnlessRaises(Unauthorized,
self.root.folder1.restrictedTraverse, 'stuff') self.root.folder1.restrictedTraverse, 'stuff')
def testDefaultValueWhenUnathorized(self):
# Test that traversing to an unauthorized object returns
# the default when provided
noSecurityManager()
SecurityManager.setSecurityPolicy(CruelSecurityPolicy())
newSecurityManager( None, UnitTestUser().__of__( self.root ) )
self.root.stuff = 'stuff here'
self.assertEqual(
self.root.folder1.restrictedTraverse('stuff', 42), 42)
def testDefaultValueWhenNotFound(self):
# Test that traversing to a non-existent object returns
# the default when provided
noSecurityManager()
SecurityManager.setSecurityPolicy( self.oldPolicy )
self.assertEqual(
self.root.restrictedTraverse('happy/happy', 'joy'), 'joy')
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment