Commit 085b1577 authored by Chris Rossi's avatar Chris Rossi

Cargo cult resolvers from pyramid_zodbconn.

parent 67c1ace3
......@@ -2,3 +2,4 @@ distribute*.tar.gz
venv
*.pyc
*.egg-info
.coverage
TRUETYPES = ('1', 'on', 'true', 't', 'yes')
FALSETYPES = ('', '0', 'off', 'false', 'f', 'no')
class SuffixMultiplier:
# d is a dictionary of suffixes to integer multipliers. If no suffixes
# match, default is the multiplier. Matches are case insensitive. Return
# values are in the fundamental unit.
def __init__(self, d, default=1):
self._d = d
self._default = default
# all keys must be the same size
self._keysz = None
for k in d.keys():
if self._keysz is None:
self._keysz = len(k)
else:
assert self._keysz == len(k)
def __call__(self, v):
v = v.lower()
for s, m in self._d.items():
if v[-self._keysz:] == s:
return int(v[:-self._keysz]) * m
return int(v) * self._default
byte_size = SuffixMultiplier({'kb': 1024,
'mb': 1024*1024,
'gb': 1024*1024*1024L,})
import os
import cgi
from cStringIO import StringIO
import urlparse
from zodburi.datatypes import byte_size
from zodburi.datatypes import FALSETYPES
from zodburi.datatypes import TRUETYPES
from ZODB.FileStorage.FileStorage import FileStorage
from ZODB.DemoStorage import DemoStorage
from ZODB.MappingStorage import MappingStorage
from ZODB.blob import BlobStorage
from ZODB.DB import DB
import ZConfig
def interpret_int_args(argnames, kw):
newkw = {}
# boolean values are also treated as integers
for name in argnames:
value = kw.get(name)
if value is not None:
value = value.lower()
if value in FALSETYPES:
value = 0
if value in TRUETYPES:
value = 1
value = int(value)
newkw[name] = value
return newkw
def interpret_string_args(argnames, kw):
newkw = {}
# strings
for name in argnames:
value = kw.get(name)
if value is not None:
newkw[name] = value
return newkw
def interpret_bytesize_args(argnames, kw):
newkw = {}
# suffix multiplied
for name in argnames:
value = kw.get(name)
if value is not None:
newkw[name] = byte_size(value)
return newkw
class Resolver(object):
def interpret_kwargs(self, kw):
new = {}
newkw = interpret_int_args(self._int_args, kw)
new.update(newkw)
newkw = interpret_string_args(self._string_args, kw)
new.update(newkw)
newkw = interpret_bytesize_args(self._bytesize_args, kw)
new.update(newkw)
return new
class MappingStorageURIResolver(Resolver):
_int_args = ('connection_cache_size', 'connection_pool_size')
_string_args = ('database_name',)
_bytesize_args = ()
def __call__(self, uri):
prefix, rest = uri.split('memory://', 1)
result = rest.split('?', 1)
if len(result) == 1:
name = result[0]
query = ''
else:
name, query = result
kw = dict(cgi.parse_qsl(query))
kw = self.interpret_kwargs(kw)
dbkw = get_dbkw(kw)
args = (name,)
dbitems = dbkw.items()
dbitems.sort()
key = (args, tuple(dbitems))
def factory():
storage = MappingStorage(*args)
return DB(storage, **dbkw)
return key, args, kw, factory
class FileStorageURIResolver(Resolver):
# XXX missing: blob_dir, packer, pack_keep_old, pack_gc, stop
_int_args = ('create', 'read_only', 'demostorage', 'connection_cache_size',
'connection_pool_size')
_string_args = ('blobstorage_dir', 'blobstorage_layout', 'database_name')
_bytesize_args = ('quota',)
def __call__(self, uri):
# we can't use urlparse.urlsplit here due to Windows filenames
prefix, rest = uri.split('file://', 1)
result = rest.split('?', 1)
if len(result) == 1:
path = result[0]
query = ''
else:
path, query = result
path = os.path.normpath(path)
kw = dict(cgi.parse_qsl(query))
kw = self.interpret_kwargs(kw)
dbkw = get_dbkw(kw)
items = kw.items()
items.sort()
args = (path,)
dbitems = dbkw.items()
dbitems.sort()
key = (args, tuple(items), tuple(dbitems))
demostorage = False
if 'demostorage'in kw:
kw.pop('demostorage')
demostorage = True
blobstorage_dir = None
blobstorage_layout = 'automatic'
if 'blobstorage_dir' in kw:
blobstorage_dir = kw.pop('blobstorage_dir')
if 'blobstorage_layout' in kw:
blobstorage_layout = kw.pop('blobstorage_layout')
if demostorage and blobstorage_dir:
def factory():
filestorage = FileStorage(*args, **kw)
blobstorage = BlobStorage(blobstorage_dir, filestorage,
layout=blobstorage_layout)
demostorage = DemoStorage(base=blobstorage)
return DB(demostorage, **dbkw)
elif blobstorage_dir:
def factory():
filestorage = FileStorage(*args, **kw)
blobstorage = BlobStorage(blobstorage_dir, filestorage,
layout=blobstorage_layout)
return DB(blobstorage, **dbkw)
elif demostorage:
def factory():
filestorage = FileStorage(*args, **kw)
demostorage = DemoStorage(base=filestorage)
return DB(demostorage, **dbkw)
else:
def factory():
filestorage = FileStorage(*args, **kw)
return DB(filestorage, **dbkw)
return key, args, kw, factory
class ClientStorageURIResolver(Resolver):
_int_args = ('debug', 'min_disconnect_poll', 'max_disconnect_poll',
'wait_for_server_on_startup', 'wait', 'wait_timeout',
'read_only', 'read_only_fallback', 'shared_blob_dir',
'demostorage', 'connection_cache_size',
'connection_pool_size')
_string_args = ('storage', 'name', 'client', 'var', 'username',
'password', 'realm', 'blob_dir', 'database_name')
_bytesize_args = ('cache_size', )
def __call__(self, uri):
# urlparse doesnt understand zeo URLs so force to something that doesn't break
uri = uri.replace('zeo://', 'http://', 1)
(scheme, netloc, path, query, frag) = urlparse.urlsplit(uri)
if netloc:
# TCP URL
if ':' in netloc:
host, port = netloc.split(':')
port = int(port)
else:
host = netloc
port = 9991
args = ((host, port),)
else:
# Unix domain socket URL
path = os.path.normpath(path)
args = (path,)
kw = dict(cgi.parse_qsl(query))
kw = self.interpret_kwargs(kw)
dbkw = get_dbkw(kw)
items = kw.items()
items.sort()
dbitems = dbkw.items()
dbitems.sort()
key = (args, tuple(items), tuple(dbitems))
if 'demostorage' in kw:
kw.pop('demostorage')
def factory():
from ZEO.ClientStorage import ClientStorage
from ZODB.DB import DB
from ZODB.DemoStorage import DemoStorage
demostorage = DemoStorage(base=ClientStorage(*args, **kw))
return DB(demostorage, **dbkw) #pragma NO COVERAGE
else:
def factory():
from ZEO.ClientStorage import ClientStorage
from ZODB.DB import DB
clientstorage = ClientStorage(*args, **kw)
return DB(clientstorage, **dbkw) #pragma NO COVERAGE
return key, args, kw, factory
def get_dbkw(kw):
dbkw = {}
dbkw['cache_size'] = 10000
dbkw['pool_size'] = 7
dbkw['database_name'] = 'unnamed'
if 'connection_cache_size' in kw:
dbkw['cache_size'] = int(kw.pop('connection_cache_size'))
if 'connection_pool_size' in kw:
dbkw['pool_size'] = int(kw.pop('connection_pool_size'))
if 'database_name' in kw:
dbkw['database_name'] = kw.pop('database_name')
return dbkw
class ZConfigURIResolver(object):
schema_xml_template = """
<schema>
<import package="ZODB"/>
<multisection type="ZODB.database" attribute="databases" />
</schema>
"""
def __call__(self, uri):
(scheme, netloc, path, query, frag) = urlparse.urlsplit(uri)
# urlparse doesnt understand file URLs and stuffs everything into path
(scheme, netloc, path, query, frag) = urlparse.urlsplit('http:' + path)
path = os.path.normpath(path)
schema_xml = self.schema_xml_template
schema = ZConfig.loadSchemaFile(StringIO(schema_xml))
config, handler = ZConfig.loadConfig(schema, path)
for database in config.databases:
if not frag:
# use the first defined in the file
break
elif frag == database.name:
# match found
break
else:
raise KeyError("No database named %s found" % frag)
return (path, frag), (), {}, database.open
RESOLVERS = {
'zeo':ClientStorageURIResolver(),
'file':FileStorageURIResolver(),
'zconfig':ZConfigURIResolver(),
'memory':MappingStorageURIResolver(),
}
\ No newline at end of file
import unittest
class Base:
def test_interpret_kwargs_noargs(self):
resolver = self._makeOne()
kwargs = resolver.interpret_kwargs({})
self.assertEqual(kwargs, {})
def test_bytesize_args(self):
resolver = self._makeOne()
names = list(resolver._bytesize_args)
kwargs = {}
for name in names:
kwargs[name] = '10MB'
args = resolver.interpret_kwargs(kwargs)
keys = args.keys()
keys.sort()
self.assertEqual(keys, names)
for name, value in args.items():
self.assertEqual(value, 10*1024*1024)
def test_int_args(self):
resolver = self._makeOne()
names = list(resolver._int_args)
kwargs = {}
for name in names:
kwargs[name] = '10'
args = resolver.interpret_kwargs(kwargs)
keys = args.keys()
keys.sort()
self.assertEqual(sorted(keys), sorted(names))
for name, value in args.items():
self.assertEqual(value, 10)
def test_string_args(self):
resolver = self._makeOne()
names = list(resolver._string_args)
kwargs = {}
for name in names:
kwargs[name] = 'string'
args = resolver.interpret_kwargs(kwargs)
keys = args.keys()
keys.sort()
self.assertEqual(keys, names)
for name, value in args.items():
self.assertEqual(value, 'string')
class TestFileStorageURIResolver(Base, unittest.TestCase):
def _getTargetClass(self):
from zodburi.resolvers import FileStorageURIResolver
return FileStorageURIResolver
def _makeOne(self):
klass = self._getTargetClass()
return klass()
def setUp(self):
import tempfile
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
import shutil
shutil.rmtree(self.tmpdir)
def test_bool_args(self):
resolver = self._makeOne()
f = resolver.interpret_kwargs
kwargs = f({'read_only':'1'})
self.assertEqual(kwargs, {'read_only':1})
kwargs = f({'read_only':'true'})
self.assertEqual(kwargs, {'read_only':1})
kwargs = f({'read_only':'on'})
self.assertEqual(kwargs, {'read_only':1})
kwargs = f({'read_only':'off'})
self.assertEqual(kwargs, {'read_only':0})
kwargs = f({'read_only':'no'})
self.assertEqual(kwargs, {'read_only':0})
kwargs = f({'read_only':'false'})
self.assertEqual(kwargs, {'read_only':0})
def test_call_no_qs(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('file:///tmp/foo/bar')
self.assertEqual(args, ('/tmp/foo/bar',))
self.assertEqual(kw, {})
def test_call_abspath(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('file:///tmp/foo/bar?read_only=true')
self.assertEqual(args, ('/tmp/foo/bar',))
self.assertEqual(kw, {'read_only':1})
def test_call_abspath_windows(self):
resolver = self._makeOne()
k, args, kw, factory = resolver(
'file://C:\\foo\\bar?read_only=true')
self.assertEqual(args, ('C:\\foo\\bar',))
self.assertEqual(kw, {'read_only':1})
def test_call_normpath(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('file:///tmp/../foo/bar?read_only=true')
self.assertEqual(args, ('/foo/bar',))
self.assertEqual(kw, {'read_only':1})
def test_invoke_factory_filestorage(self):
import os
self.assertFalse(os.path.exists(os.path.join(self.tmpdir, 'db.db')))
resolver = self._makeOne()
k, args, kw, factory = resolver(
'file://%s/db.db?quota=200' % self.tmpdir)
self.assertEqual(k,
(('%s/db.db' % self.tmpdir,), (('quota', 200),),
(('cache_size', 10000), ('database_name', 'unnamed'),
('pool_size', 7)))
)
factory()
self.assertTrue(os.path.exists(os.path.join(self.tmpdir, 'db.db')))
def test_demostorage(self):
resolver = self._makeOne()
k, args, kw, factory = resolver(
'file:///tmp/../foo/bar?demostorage=true')
self.assertEqual(args, ('/foo/bar',))
self.assertEqual(kw, {})
def test_invoke_factory_demostorage(self):
import os
from ZODB.DemoStorage import DemoStorage
from ZODB.FileStorage import FileStorage
DB_FILE = os.path.join(self.tmpdir, 'db.db')
self.assertFalse(os.path.exists(DB_FILE))
resolver = self._makeOne()
k, args, kw, factory = resolver(
'file://%s/db.db?quota=200&demostorage=true' % self.tmpdir)
self.assertEqual(k,
(('%s/db.db' % self.tmpdir,),
(('demostorage', 1),
('quota', 200),
),
(('cache_size', 10000),
('database_name', 'unnamed'),
('pool_size', 7)
),
)
)
db = factory()
self.assertTrue(isinstance(db._storage, DemoStorage))
self.assertTrue(isinstance(db._storage.base, FileStorage))
self.assertTrue(os.path.exists(DB_FILE))
def test_blobstorage(self):
resolver = self._makeOne()
k, args, kw, factory = resolver(
('file:///tmp/../foo/bar'
'?blobstorage_dir=/foo/bar&blobstorage_layout=bushy'))
self.assertEqual(args, ('/foo/bar',))
self.assertEqual(kw, {})
def test_invoke_factory_blobstorage(self):
import os
from urllib import quote as q
from ZODB.blob import BlobStorage
DB_FILE = os.path.join(self.tmpdir, 'db.db')
BLOB_DIR = os.path.join(self.tmpdir, 'blob')
self.assertFalse(os.path.exists(DB_FILE))
resolver = self._makeOne()
k, args, kw, factory = resolver(
'file://%s/db.db?quota=200'
'&blobstorage_dir=%s/blob'
'&blobstorage_layout=bushy' % (self.tmpdir, q(self.tmpdir)))
self.assertEqual(k,
(('%s/db.db' % self.tmpdir,),
(('blobstorage_dir', '%s/blob' % self.tmpdir),
('blobstorage_layout', 'bushy'),
('quota', 200),
),
(('cache_size', 10000),
('database_name', 'unnamed'),
('pool_size', 7)
),
)
)
db = factory()
self.assertTrue(isinstance(db._storage, BlobStorage))
self.assertTrue(os.path.exists(DB_FILE))
self.assertTrue(os.path.exists(BLOB_DIR))
def test_blobstorage_and_demostorage(self):
resolver = self._makeOne()
k, args, kw, factory = resolver(
('file:///tmp/../foo/bar?demostorage=true'
'&blobstorage_dir=/foo/bar&blobstorage_layout=bushy'))
self.assertEqual(args, ('/foo/bar',))
self.assertEqual(kw, {})
def test_invoke_factory_blobstorage_and_demostorage(self):
import os
from urllib import quote as q
from ZODB.DemoStorage import DemoStorage
DB_FILE = os.path.join(self.tmpdir, 'db.db')
BLOB_DIR = os.path.join(self.tmpdir, 'blob')
self.assertFalse(os.path.exists(DB_FILE))
resolver = self._makeOne()
k, args, kw, factory = resolver(
'file://%s/db.db?quota=200&demostorage=true'
'&blobstorage_dir=%s/blob'
'&blobstorage_layout=bushy' % (self.tmpdir, q(self.tmpdir)))
self.assertEqual(k,
(('%s/db.db' % self.tmpdir,),
(('blobstorage_dir', '%s/blob' % self.tmpdir),
('blobstorage_layout', 'bushy'),
('demostorage', 1),
('quota', 200),
),
(('cache_size', 10000),
('database_name', 'unnamed'),
('pool_size', 7)
),
)
)
db = factory()
self.assertTrue(isinstance(db._storage, DemoStorage))
self.assertTrue(os.path.exists(DB_FILE))
self.assertTrue(os.path.exists(BLOB_DIR))
def test_dbargs(self):
resolver = self._makeOne()
k, args, kw, factory = resolver(
('file:///tmp/../foo/bar?connection_pool_size=1'
'&connection_cache_size=1&database_name=dbname'))
self.assertEqual(k[2],
(('cache_size', 1), ('database_name', 'dbname'),
('pool_size', 1)))
class TestClientStorageURIResolver(unittest.TestCase):
def _getTargetClass(self):
from zodburi.resolvers import ClientStorageURIResolver
return ClientStorageURIResolver
def _makeOne(self):
klass = self._getTargetClass()
return klass()
def test_bool_args(self):
resolver = self._makeOne()
f = resolver.interpret_kwargs
kwargs = f({'read_only':'1'})
self.assertEqual(kwargs, {'read_only':1})
kwargs = f({'read_only':'true'})
self.assertEqual(kwargs, {'read_only':1})
kwargs = f({'read_only':'on'})
self.assertEqual(kwargs, {'read_only':1})
kwargs = f({'read_only':'off'})
self.assertEqual(kwargs, {'read_only':0})
kwargs = f({'read_only':'no'})
self.assertEqual(kwargs, {'read_only':0})
kwargs = f({'read_only':'false'})
self.assertEqual(kwargs, {'read_only':0})
def test_call_tcp_no_port(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('zeo://localhost?debug=true')
self.assertEqual(args, (('localhost', 9991),))
self.assertEqual(kw, {'debug':1})
self.assertEqual(k,
((('localhost', 9991),), (('debug', 1),),
(('cache_size', 10000), ('database_name','unnamed'),
('pool_size', 7))))
def test_call_tcp(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('zeo://localhost:8080?debug=true')
self.assertEqual(args, (('localhost', 8080),))
self.assertEqual(kw, {'debug':1})
self.assertEqual(k,
((('localhost', 8080),), (('debug', 1),),
(('cache_size', 10000), ('database_name','unnamed'),
('pool_size', 7))))
def test_call_unix(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('zeo:///var/sock?debug=true')
self.assertEqual(args, ('/var/sock',))
self.assertEqual(kw, {'debug':1})
self.assertEqual(k,
(('/var/sock',),
(('debug', 1),),
(('cache_size', 10000),
('database_name', 'unnamed'),
('pool_size', 7))))
def test_invoke_factory(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('zeo:///var/nosuchfile?wait=false')
self.assertEqual(k, (('/var/nosuchfile',),
(('wait', 0),),
(('cache_size', 10000),
('database_name', 'unnamed'), ('pool_size', 7))))
from ZEO.ClientStorage import ClientDisconnected
self.assertRaises(ClientDisconnected, factory)
def test_demostorage(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('zeo:///var/sock?demostorage=true')
self.assertEqual(args, ('/var/sock',))
self.assertEqual(kw, {})
def test_invoke_factory_demostorage(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('zeo:///var/nosuchfile?wait=false'
'&demostorage=true')
self.assertEqual(k, (('/var/nosuchfile',),
(('demostorage', 1),
('wait', 0),),
(('cache_size', 10000),
('database_name', 'unnamed'),
('pool_size', 7),
),
))
from ZEO.ClientStorage import ClientDisconnected
self.assertRaises(ClientDisconnected, factory)
def test_dbargs(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('zeo://localhost:8080?debug=true&'
'connection_pool_size=1&'
'connection_cache_size=1&'
'database_name=dbname')
self.assertEqual(k[2],
(('cache_size', 1), ('database_name', 'dbname'),
('pool_size', 1)))
class TestZConfigURIResolver(unittest.TestCase):
def _getTargetClass(self):
from zodburi.resolvers import ZConfigURIResolver
return ZConfigURIResolver
def _makeOne(self):
klass = self._getTargetClass()
return klass()
def setUp(self):
import tempfile
self.tmp = tempfile.NamedTemporaryFile()
def tearDown(self):
self.tmp.close()
def test_named_database(self):
self.tmp.write("""
<zodb otherdb>
<demostorage>
</demostorage>
</zodb>
<zodb demodb>
<mappingstorage>
</mappingstorage>
</zodb>
""")
self.tmp.flush()
resolver = self._makeOne()
k, args, kw, factory = resolver('zconfig://%s#demodb' % self.tmp.name)
db = factory()
from ZODB.MappingStorage import MappingStorage
self.assertTrue(isinstance(db._storage, MappingStorage))
def test_anonymous_database(self):
self.tmp.write("""
<zodb>
<mappingstorage>
</mappingstorage>
</zodb>
<zodb demodb>
<mappingstorage>
</mappingstorage>
</zodb>
""")
self.tmp.flush()
resolver = self._makeOne()
k, args, kw, factory = resolver('zconfig://%s' % self.tmp.name)
db = factory()
from ZODB.MappingStorage import MappingStorage
self.assertTrue(isinstance(db._storage, MappingStorage))
def test_database_not_found(self):
self.tmp.write("""
<zodb x>
<mappingstorage>
</mappingstorage>
</zodb>
""")
self.tmp.flush()
resolver = self._makeOne()
self.assertRaises(KeyError, resolver, 'zconfig://%s#y' % self.tmp.name)
class TestMappingStorageURIResolver(Base, unittest.TestCase):
def _getTargetClass(self):
from zodburi.resolvers import MappingStorageURIResolver
return MappingStorageURIResolver
def _makeOne(self):
klass = self._getTargetClass()
return klass()
def test_call_no_qs(self):
resolver = self._makeOne()
k, args, kw, factory = resolver('memory://')
self.assertEqual(args, ('',))
self.assertEqual(kw, {})
db = factory()
from ZODB.MappingStorage import MappingStorage
self.assertTrue(isinstance(db._storage, MappingStorage))
def test_call_with_qs(self):
uri='memory://storagename?connection_cache_size=100&database_name=fleeb'
resolver = self._makeOne()
k, args, kw, factory = resolver(uri)
self.assertEqual(args, ('storagename',))
self.assertEqual(kw, {})
self.assertEqual(k, (('storagename',),
(('cache_size', 100), ('database_name', 'fleeb'),
('pool_size', 7))))
db = factory()
from ZODB.MappingStorage import MappingStorage
self.assertTrue(isinstance(db._storage, MappingStorage))
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment