Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
Zope
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
Zope
Commits
6b180795
Commit
6b180795
authored
Nov 08, 2001
by
matt@zope.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Added Session test cases
parent
684759a0
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
718 additions
and
0 deletions
+718
-0
lib/python/Products/Sessions/tests/testBrowserIdManager.py
lib/python/Products/Sessions/tests/testBrowserIdManager.py
+356
-0
lib/python/Products/Sessions/tests/testSessionDataManager.py
lib/python/Products/Sessions/tests/testSessionDataManager.py
+362
-0
No files found.
lib/python/Products/Sessions/tests/testBrowserIdManager.py
0 → 100644
View file @
6b180795
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""
Test suite for session id manager.
$Id: testBrowserIdManager.py,v 1.1 2001/11/08 21:22:07 matt Exp $
"""
__version__
=
"$Revision: 1.1 $"
[
11
:
-
2
]
import
sys
if
__name__
==
"__main__"
:
sys
.
path
.
insert
(
0
,
'../../..'
)
sys
.
path
.
insert
(
0
,
'..'
)
import
ZODB
from
Products.Sessions.BrowserIdManager
import
BrowserIdManager
,
BrowserIdManagerErr
from
unittest
import
TestCase
,
TestSuite
,
TextTestRunner
,
makeSuite
from
ZPublisher.HTTPRequest
import
HTTPRequest
from
ZPublisher.HTTPResponse
import
HTTPResponse
from
sys
import
stdin
from
os
import
environ
class
TestBrowserIdManager
(
TestCase
):
def
setUp
(
self
):
self
.
m
=
BrowserIdManager
(
'foo'
)
resp
=
HTTPResponse
()
environ
[
'SERVER_NAME'
]
=
'fred'
environ
[
'SERVER_PORT'
]
=
'80'
req
=
HTTPRequest
(
stdin
,
environ
,
resp
)
self
.
m
.
REQUEST
=
req
def
tearDown
(
self
):
del
self
.
m
def
testSetTokenKey
(
self
):
self
.
m
.
setTokenKey
(
'foo'
)
assert
self
.
m
.
getTokenKey
()
==
'foo'
def
testSetBadKeyString
(
self
):
try
:
self
.
m
.
setTokenKey
(
''
)
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
try
:
self
.
m
.
setTokenKey
(
1
)
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
def
testSetBadNamespaces
(
self
):
d
=
{
1
:
'gummy'
,
2
:
'froopy'
}
try
:
self
.
m
.
setTokenKeyNamespaces
(
d
)
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
def
testSetGoodNamespaces
(
self
):
d
=
{
1
:
'cookies'
,
2
:
'form'
}
self
.
m
.
setTokenKeyNamespaces
(
d
)
assert
self
.
m
.
getTokenKeyNamespaces
()
==
d
def
testSetBadCookiePath
(
self
):
path
=
'/;'
try
:
self
.
m
.
setCookiePath
(
path
)
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
def
testSetGoodCookiePath
(
self
):
self
.
m
.
setCookiePath
(
'/foo'
)
assert
self
.
m
.
getCookiePath
()
==
'/foo'
def
testSetBadCookieLifeDays
(
self
):
life
=
''
try
:
self
.
m
.
setCookieLifeDays
(
''
)
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
def
testSetGoodCookieLifeDays
(
self
):
self
.
m
.
setCookieLifeDays
(
1
)
assert
self
.
m
.
getCookieLifeDays
()
==
1
def
testSetBadCookieDomain
(
self
):
life
=
''
try
:
self
.
m
.
setCookieDomain
(
'gubble'
)
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
def
testSetGoodCookieLifeDays
(
self
):
self
.
m
.
setCookieLifeDays
(
1
)
assert
self
.
m
.
getCookieLifeDays
()
==
1
def
testSetNoCookieDomain
(
self
):
domain
=
''
self
.
m
.
setCookieDomain
(
domain
)
setdomain
=
self
.
m
.
getCookieDomain
()
assert
setdomain
==
domain
,
"%s"
%
setdomain
def
testSetBadCookieDomain
(
self
):
domain
=
'zope.org'
# not enough dots
try
:
self
.
m
.
setCookieDomain
(
domain
)
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
domain
=
{
1
:
1
}
# must be stringtype
try
:
self
.
m
.
setCookieDomain
(
domain
)
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
domain
=
'.zope.org;'
# semicolon follows
try
:
self
.
m
.
setCookieDomain
(
domain
)
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
def
testSetGoodCookieDomain
(
self
):
self
.
m
.
setCookieDomain
(
'.zope.org'
)
setdomain
=
self
.
m
.
getCookieDomain
()
assert
setdomain
==
'.zope.org'
,
"%s"
%
setdomain
def
testSetCookieSecure
(
self
):
self
.
m
.
setCookieSecure
(
1
)
assert
self
.
m
.
getCookieSecure
()
==
1
def
testDelegateToParent
(
self
):
self
.
m
.
turnOff
()
try
:
a
=
self
.
m
.
hasToken
()
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
def
testGetTokenCookie
(
self
):
token
=
self
.
m
.
getToken
()
self
.
m
.
REQUEST
.
browser_token_
=
token
self
.
m
.
REQUEST
.
browser_token_ns_
=
'cookies'
tokenkey
=
self
.
m
.
getTokenKey
()
self
.
m
.
REQUEST
.
cookies
[
tokenkey
]
=
token
a
=
self
.
m
.
getToken
()
assert
a
==
token
,
repr
(
a
)
assert
self
.
m
.
isTokenFromCookie
()
def
testSetSessionTokenDontCreate
(
self
):
a
=
self
.
m
.
getToken
(
0
)
assert
a
==
None
def
testSetSessionTokenCreate
(
self
):
a
=
self
.
m
.
getToken
(
1
)
tokenkey
=
self
.
m
.
getTokenKey
()
b
=
self
.
m
.
REQUEST
.
RESPONSE
.
cookies
[
tokenkey
]
assert
a
==
b
[
'value'
],
(
a
,
b
)
def
testHasToken
(
self
):
assert
not
self
.
m
.
hasToken
()
a
=
self
.
m
.
getToken
()
assert
self
.
m
.
hasToken
()
def
testTokenIsNew
(
self
):
a
=
self
.
m
.
getToken
()
assert
self
.
m
.
isTokenNew
()
def
testIsTokenFromCookieFirst
(
self
):
token
=
self
.
m
.
getToken
()
self
.
m
.
REQUEST
.
browser_token_
=
token
self
.
m
.
REQUEST
.
browser_token_ns_
=
'cookies'
tokenkey
=
self
.
m
.
getTokenKey
()
self
.
m
.
REQUEST
.
cookies
[
tokenkey
]
=
token
self
.
m
.
setTokenKeyNamespaces
({
1
:
'cookies'
,
2
:
'form'
})
a
=
self
.
m
.
getToken
()
assert
self
.
m
.
isTokenFromCookie
()
def
testIsTokenFromFormFirst
(
self
):
token
=
self
.
m
.
getToken
()
self
.
m
.
REQUEST
.
browser_token_
=
token
self
.
m
.
REQUEST
.
browser_token_ns_
=
'form'
tokenkey
=
self
.
m
.
getTokenKey
()
self
.
m
.
REQUEST
.
form
[
tokenkey
]
=
token
self
.
m
.
setTokenKeyNamespaces
({
1
:
'form'
,
2
:
'cookies'
})
a
=
self
.
m
.
getToken
()
assert
self
.
m
.
isTokenFromForm
()
def
testIsTokenFromCookieOnly
(
self
):
token
=
self
.
m
.
getToken
()
self
.
m
.
REQUEST
.
browser_token_
=
token
self
.
m
.
REQUEST
.
browser_token_ns_
=
'cookies'
tokenkey
=
self
.
m
.
getTokenKey
()
self
.
m
.
REQUEST
.
cookies
[
tokenkey
]
=
token
self
.
m
.
setTokenKeyNamespaces
({
1
:
'cookies'
})
a
=
self
.
m
.
getToken
()
assert
self
.
m
.
isTokenFromCookie
()
assert
not
self
.
m
.
isTokenFromForm
()
def
testIsTokenFromFormOnly
(
self
):
token
=
self
.
m
.
getToken
()
self
.
m
.
REQUEST
.
browser_token_
=
token
self
.
m
.
REQUEST
.
browser_token_ns_
=
'form'
tokenkey
=
self
.
m
.
getTokenKey
()
self
.
m
.
REQUEST
.
form
[
tokenkey
]
=
token
self
.
m
.
setTokenKeyNamespaces
({
1
:
'form'
})
a
=
self
.
m
.
getToken
()
assert
self
.
m
.
isTokenFromForm
()
assert
not
self
.
m
.
isTokenFromCookie
()
def
testFlushTokenCookie
(
self
):
token
=
self
.
m
.
getToken
()
self
.
m
.
REQUEST
.
browser_token_
=
token
self
.
m
.
REQUEST
.
browser_token_ns_
=
'cookies'
tokenkey
=
self
.
m
.
getTokenKey
()
self
.
m
.
REQUEST
.
cookies
[
tokenkey
]
=
token
a
=
self
.
m
.
getToken
()
assert
a
==
token
,
repr
(
a
)
assert
self
.
m
.
isTokenFromCookie
()
self
.
m
.
flushTokenCookie
()
c
=
self
.
m
.
REQUEST
.
RESPONSE
.
cookies
[
tokenkey
]
assert
c
[
'value'
]
==
'deleted'
,
c
def
testDelegateToParentFail
(
self
):
self
.
m
.
turnOff
()
try
:
self
.
m
.
getToken
()
except
BrowserIdManagerErr
:
pass
else
:
assert
1
==
2
def
testDelegateToParentSucceed
(
self
):
self
.
m
.
turnOff
()
class
foo
:
pass
class
bar
:
def
getToken
(
unself
,
create
=
1
):
return
'worked'
fooi
=
foo
()
bari
=
bar
()
setattr
(
fooi
,
self
.
m
.
id
,
bari
)
self
.
m
.
aq_parent
=
fooi
assert
self
.
m
.
getToken
()
==
'worked'
def
testEncodeUrl
(
self
):
keystring
=
self
.
m
.
getTokenKey
()
key
=
self
.
m
.
getToken
()
u
=
'/home/chrism/foo'
r
=
self
.
m
.
encodeUrl
(
u
)
assert
r
==
'%s?%s=%s'
%
(
u
,
keystring
,
key
)
u
=
'http://www.zope.org/Members/mcdonc?foo=bar&spam=eggs'
r
=
self
.
m
.
encodeUrl
(
u
)
assert
r
==
'%s&%s=%s'
%
(
u
,
keystring
,
key
)
if
__name__
==
'__main__'
:
testsuite
=
makeSuite
(
TestBrowserIdManager
,
'test'
)
runner
=
TextTestRunner
()
runner
.
run
(
testsuite
)
lib/python/Products/Sessions/tests/testSessionDataManager.py
0 → 100644
View file @
6b180795
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
import
sys
,
os
,
time
if
__name__
==
"__main__"
:
sys
.
path
.
insert
(
0
,
'../../..'
)
#os.chdir('../../..')
from
Testing
import
makerequest
import
ZODB
# in order to get Persistence.Persistent working
import
Acquisition
from
Acquisition
import
aq_base
from
Products.Sessions.BrowserIdManager
import
BrowserIdManager
from
Products.Sessions.SessionDataManager
import
\
SessionDataManager
,
SessionDataManagerErr
from
Products.Transience.Transience
import
\
TransientObjectContainer
,
TransientObject
from
ZODB.POSException
import
InvalidObjectReference
from
DateTime
import
DateTime
from
unittest
import
TestCase
,
TestSuite
,
TextTestRunner
,
makeSuite
import
time
,
threading
,
whrandom
from
cPickle
import
UnpickleableError
idmgr_name
=
'browser_id_manager'
toc_name
=
'temp_transient_container'
def
f
(
sdo
):
pass
class
Foo
(
Acquisition
.
Implicit
):
pass
class
TestBase
(
TestCase
):
def
setUp
(
self
):
import
Zope
self
.
app
=
makerequest
.
makerequest
(
Zope
.
app
())
del
Zope
timeout
=
self
.
timeout
=
1
#bidmgr = BrowserIdManager(idmgr_name)
#toc = TransientObjectContainer(tocname, title='Temporary '
# 'Transient Object Container', timeout_mins=20)
#session_data_manager=SessionDataManager(id='session_data_manager', path='/'+toc_name, title='SessionThing')
#try: self.app._delObject(idmgr_name)
#except AttributeError: pass
#try: self.app._delObject(toc_name)
#except AttributeError: pass
#try: self.app._delObject('session_data_manager')
#except AttributeError: pass
#self.app._setObject(idmgr_name, bidmgr)
#self.app._setObject(toc_name, toc)
#self.app._setObject('session_data_manager', session_data_manager)
# leans on the fact that these things exist by app init
## admin = self.app.acl_users.getUser('admin')
## if admin is None:
## raise "Need to define an 'admin' user before running these tests"
## admin = admin.__of__(self.app.acl_users)
## self.app.session_data_manager.changeOwnership(admin)
def
tearDown
(
self
):
get_transaction
().
abort
()
self
.
app
.
_p_jar
.
close
()
self
.
app
=
None
del
self
.
app
class
TestSessionManager
(
TestBase
):
def
testHasId
(
self
):
assert
self
.
app
.
session_data_manager
.
id
==
'session_data_manager'
def
testHasTitle
(
self
):
assert
self
.
app
.
session_data_manager
.
title
==
'Session Data Manager'
def
testGetSessionDataNoCreate
(
self
):
sd
=
self
.
app
.
session_data_manager
.
getSessionData
(
0
)
assert
sd
is
None
,
repr
(
sd
)
def
testGetSessionDataCreate
(
self
):
sd
=
self
.
app
.
session_data_manager
.
getSessionData
(
1
)
assert
sd
.
__class__
is
TransientObject
,
repr
(
sd
)
def
testHasSessionData
(
self
):
sd
=
self
.
app
.
session_data_manager
.
getSessionData
()
assert
self
.
app
.
session_data_manager
.
hasSessionData
()
def
testNewSessionDataObjectIsValid
(
self
):
sdType
=
type
(
TransientObject
(
1
))
sd
=
self
.
app
.
session_data_manager
.
getSessionData
()
assert
type
(
getattr
(
sd
,
'aq_base'
,
sd
))
is
sdType
assert
not
hasattr
(
sd
,
'_invalid'
)
def
testInvalidateSessionDataObject
(
self
):
sd
=
self
.
app
.
session_data_manager
.
getSessionData
()
sd
.
invalidate
()
assert
hasattr
(
sd
,
'_invalid'
)
def
testSessionTokenIsSet
(
self
):
sd
=
self
.
app
.
session_data_manager
.
getSessionData
()
mgr
=
getattr
(
self
.
app
,
idmgr_name
)
assert
mgr
.
hasToken
()
def
testGetSessionDataByKey
(
self
):
sd
=
self
.
app
.
session_data_manager
.
getSessionData
()
mgr
=
getattr
(
self
.
app
,
idmgr_name
)
token
=
mgr
.
getToken
()
bykeysd
=
self
.
app
.
session_data_manager
.
getSessionDataByKey
(
token
)
assert
sd
==
bykeysd
,
(
sd
,
bykeysd
,
token
)
def
testBadExternalSDCPath
(
self
):
self
.
app
.
session_data_manager
.
REQUEST
[
'REQUEST_METHOD'
]
=
'GET'
# fake out webdav
self
.
app
.
session_data_manager
.
setContainerPath
(
'/fudgeffoloo'
)
try
:
self
.
app
.
session_data_manager
.
getSessionData
()
except
SessionDataManagerErr
:
pass
else
:
assert
1
==
2
,
self
.
app
.
session_data_manager
.
getSessionDataContainerPath
()
def
testInvalidateSessionDataObject
(
self
):
sd
=
self
.
app
.
session_data_manager
.
getSessionData
()
sd
[
'test'
]
=
'Its alive! Alive!'
sd
.
invalidate
()
assert
not
self
.
app
.
session_data_manager
.
getSessionData
().
has_key
(
'test'
)
def
testGhostUnghostSessionManager
(
self
):
get_transaction
().
commit
()
sd
=
self
.
app
.
session_data_manager
.
getSessionData
()
sd
.
set
(
'foo'
,
'bar'
)
self
.
app
.
session_data_manager
.
_p_changed
=
None
get_transaction
().
commit
()
assert
self
.
app
.
session_data_manager
.
getSessionData
().
get
(
'foo'
)
==
'bar'
def
testSubcommit
(
self
):
sd
=
self
.
app
.
session_data_manager
.
getSessionData
()
sd
.
set
(
'foo'
,
'bar'
)
assert
get_transaction
().
commit
(
1
)
==
None
def
testForeignObject
(
self
):
self
.
assertRaises
(
InvalidObjectReference
,
self
.
_foreignAdd
)
def
_foreignAdd
(
self
):
ob
=
self
.
app
.
session_data_manager
sd
=
self
.
app
.
session_data_manager
.
getSessionData
()
sd
.
set
(
'foo'
,
ob
)
get_transaction
().
commit
()
def
testAqWrappedObjectsFail
(
self
):
a
=
Foo
()
b
=
Foo
()
aq_wrapped
=
a
.
__of__
(
b
)
sd
=
self
.
app
.
session_data_manager
.
getSessionData
()
sd
.
set
(
'foo'
,
aq_wrapped
)
self
.
assertRaises
(
UnpickleableError
,
get_transaction
().
commit
)
class
TestMultiThread
(
TestBase
):
def
testNonOverlappingSids
(
self
):
readers
=
[]
writers
=
[]
readiters
=
20
writeiters
=
5
readout
=
[]
writeout
=
[]
numreaders
=
20
numwriters
=
5
rlock
=
threading
.
Lock
()
session_data_managername
=
'session_data_manager'
for
i
in
range
(
numreaders
):
mgr
=
getattr
(
self
.
app
,
idmgr_name
)
sid
=
mgr
.
_getNewToken
()
app
=
aq_base
(
self
.
app
)
thread
=
ReaderThread
(
sid
,
app
,
readiters
,
session_data_managername
)
readers
.
append
(
thread
)
for
i
in
range
(
numwriters
):
mgr
=
getattr
(
self
.
app
,
idmgr_name
)
sid
=
mgr
.
_getNewToken
()
app
=
aq_base
(
self
.
app
)
thread
=
WriterThread
(
sid
,
app
,
readiters
,
session_data_managername
)
writers
.
append
(
thread
)
for
thread
in
readers
:
thread
.
start
()
time
.
sleep
(
0.1
)
for
thread
in
writers
:
thread
.
start
()
time
.
sleep
(
0.1
)
while
threading
.
activeCount
()
>
1
:
time
.
sleep
(
1
)
for
thread
in
readers
:
assert
thread
.
out
==
[],
thread
.
out
def
testOverlappingSids
(
self
):
readers
=
[]
writers
=
[]
readiters
=
20
writeiters
=
5
readout
=
[]
writeout
=
[]
numreaders
=
20
numwriters
=
5
rlock
=
threading
.
Lock
()
session_data_managername
=
'session_data_manager'
mgr
=
getattr
(
self
.
app
,
idmgr_name
)
sid
=
mgr
.
_getNewToken
()
for
i
in
range
(
numreaders
):
app
=
aq_base
(
self
.
app
)
thread
=
ReaderThread
(
sid
,
app
,
readiters
,
session_data_managername
)
readers
.
append
(
thread
)
for
i
in
range
(
numwriters
):
app
=
aq_base
(
self
.
app
)
thread
=
WriterThread
(
sid
,
app
,
readiters
,
session_data_managername
)
writers
.
append
(
thread
)
for
thread
in
readers
:
thread
.
start
()
time
.
sleep
(
0.1
)
for
thread
in
writers
:
thread
.
start
()
time
.
sleep
(
0.1
)
while
threading
.
activeCount
()
>
1
:
time
.
sleep
(
1
)
for
thread
in
readers
:
assert
thread
.
out
==
[],
thread
.
out
class
ReaderThread
(
threading
.
Thread
):
def
__init__
(
self
,
sid
,
app
,
iters
,
session_data_managername
):
self
.
sid
=
sid
self
.
app
=
app
self
.
iters
=
iters
self
.
session_data_managername
=
session_data_managername
self
.
out
=
[]
threading
.
Thread
.
__init__
(
self
)
def
run
(
self
):
self
.
app
=
makerequest
.
makerequest
(
self
.
app
)
self
.
app
.
REQUEST
.
session_token_
=
self
.
sid
session_data_manager
=
getattr
(
self
.
app
,
self
.
session_data_managername
)
data
=
session_data_manager
.
getSessionData
(
create
=
1
)
t
=
time
.
time
()
data
[
t
]
=
1
get_transaction
().
commit
()
for
i
in
range
(
self
.
iters
):
data
=
session_data_manager
.
getSessionData
()
if
not
data
.
has_key
(
t
):
self
.
out
.
append
(
1
)
time
.
sleep
(
whrandom
.
choice
(
range
(
3
)))
get_transaction
().
commit
()
class
WriterThread
(
threading
.
Thread
):
def
__init__
(
self
,
sid
,
app
,
iters
,
session_data_managername
):
self
.
sid
=
sid
self
.
app
=
app
self
.
iters
=
iters
self
.
session_data_managername
=
session_data_managername
threading
.
Thread
.
__init__
(
self
)
def
run
(
self
):
self
.
app
=
makerequest
.
makerequest
(
self
.
app
)
self
.
app
.
REQUEST
.
session_token_
=
self
.
sid
session_data_manager
=
getattr
(
self
.
app
,
self
.
session_data_managername
)
for
i
in
range
(
self
.
iters
):
data
=
session_data_manager
.
getSessionData
()
data
[
time
.
time
()]
=
1
n
=
whrandom
.
choice
(
range
(
8
))
time
.
sleep
(
n
)
if
n
%
2
==
0
:
get_transaction
().
commit
()
else
:
get_transaction
().
abort
()
def
test_suite
():
test_datamgr
=
makeSuite
(
TestSessionManager
,
'test'
)
test_multithread
=
makeSuite
(
TestMultiThread
,
'test'
)
suite
=
TestSuite
((
test_datamgr
,
test_multithread
))
return
suite
if
__name__
==
'__main__'
:
runner
=
TextTestRunner
(
verbosity
=
9
,
descriptions
=
9
)
runner
.
run
(
test_suite
())
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment