Commit c1d782ac authored by Guido van Rossum's avatar Guido van Rossum

Move the storage configuration code here, where it belongs.

parent 30d4be5d
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Higher-level support for configuring storages.
Storages are configured a la DBTab.
A storage section has the form
<Storage Name (dependent)>
# For example
type FileStorage
file_name var/Data.fs
read_only 1
</Storage>
where Name and (dependent) are optional. Once you have retrieved the
section object (probably with getSection("Storage", name), the
function creatStorage() in this module will create the storage object
for you.
"""
from StorageTypes import storage_types
def createStorage(section):
"""Create a storage specified by a configuration section."""
klass, args = getStorageInfo(section)
return klass(**args)
def getStorageInfo(section):
"""Extract a storage description from a configuration section.
Return a tuple (klass, args) where klass is the storage class and
args is a dictionary of keyword arguments. To create the storage,
call klass(**args).
Adapted from DatabaseFactory.setStorageParams() in DBTab.py.
"""
type = section.get("type")
if not type:
raise RuntimeError, "A storage type is required"
module = None
pos = type.rfind(".")
if pos >= 0:
# Specified the module
module, type = type[:pos], type[pos+1:]
converter = None
if not module:
# Use a default module and argument converter.
info = storage_types.get(type)
if not info:
raise RuntimeError, "Unknown storage type: %s" % type
module, converter = info
m = __import__(module, {}, {}, [type])
klass = getattr(m, type)
args = {}
for key in section.keys():
if key.lower() != "type":
args[key] = section.get(key)
if converter is not None:
args = converter(**args)
return (klass, args)
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Default storage types.
Adapted from DBTab/StorageTypes.py.
"""
import re
from ZConfig.Common import asBoolean
def convertFileStorageArgs(quota=None, stop=None, **kw):
if kw.has_key('name'):
# FileStorage doesn't accept a 'name' arg
del kw['name']
if quota is not None:
kw['quota'] = long(quota) or None
if stop is not None:
stop = long(stop)
if not stop:
stop = None
else:
from ZODB.utils import p64
stop = p64(stop)
kw['stop'] = stop
# Boolean args
for name in (
'create', 'read_only'
):
if kw.has_key(name):
kw[name] = asBoolean(kw[name])
return kw
# Match URLs of the form 'zeo://zope.example.com:1234'
zeo_url_re = re.compile('zeo:/*(?P<host>[A-Za-z0-9\.-]+):(?P<port>[0-9]+)')
def convertAddresses(s):
# Allow multiple addresses using semicolons as a split character.
res = []
for a in s.split(';'):
a = a.strip()
if a:
mo = zeo_url_re.match(a)
if mo is not None:
# ZEO URL
host, port = mo.groups()
res.append((host, int(port)))
else:
# Socket file
res.append(a)
return res
def convertClientStorageArgs(addr=None, **kw):
if addr is None:
raise RuntimeError, 'An addr parameter is required for ClientStorage.'
kw['addr'] = convertAddresses(addr)
# Integer args
for name in (
'cache_size', 'min_disconnect_poll', 'max_disconnect_poll',
):
if kw.has_key(name):
kw[name] = int(kw[name])
# Boolean args
for name in (
'wait', 'read_only', 'read_only_fallback',
):
if kw.has_key(name):
kw[name] = asBoolean(kw[name])
# The 'client' parameter must be None to be false. Yuck.
if kw.has_key('client') and not kw['client']:
kw['client'] = None
return kw
def convertBDBStorageArgs(**kw):
from bsddb3Storage.BerkeleyBase import BerkeleyConfig
config = BerkeleyConfig()
for name in dir(BerkeleyConfig):
if name.startswith('_'):
continue
val = kw.get(name)
if val is not None:
if name != 'logdir':
val = int(val)
setattr(config, name, val)
del kw[name]
# XXX: Nobody ever passes in env
assert not kw.has_key('env')
kw['config'] = config
return kw
storage_types = {
'FileStorage': ('ZODB.FileStorage', convertFileStorageArgs),
'DemoStorage': ('ZODB.DemoStorage', None),
'MappingStorage': ('ZODB.MappingStorage', None),
'TemporaryStorage': ('Products.TemporaryFolder.TemporaryStorage', None),
'ClientStorage': ('ZEO.ClientStorage', convertClientStorageArgs),
'Full': ('bsddb3Storage.Full', convertBDBStorageArgs),
'Minimal': ('bsddb3Storage.Minimal', convertBDBStorageArgs),
}
import os
import shutil
import tempfile
import unittest
from StringIO import StringIO
import ZConfig
from ZODB import StorageConfig
class StorageTestCase(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.tmpfn = tempfile.mktemp()
self.storage = None
def tearDown(self):
unittest.TestCase.tearDown(self)
storage = self.storage
self.storage = None
try:
if storage is not None:
storage.close()
except:
pass
try:
# Full storage creates a directory
if os.path.isdir(self.tmpfn):
shutil.rmtree(self.tmpfn)
else:
os.remove(self.tmpfn)
except os.error:
pass
def testFileStorage(self):
from ZODB.FileStorage import FileStorage
sample = """
<Storage>
type FileStorage
file_name %s
create yes
</Storage>
""" % self.tmpfn
io = StringIO(sample)
rootconf = ZConfig.loadfile(io)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, FileStorage)
self.assertEqual(args, {"file_name": self.tmpfn, "create": 1})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, FileStorage))
def testZEOStorage(self):
from ZEO.ClientStorage import ClientStorage
sample = """
<Storage>
type ClientStorage
addr %s
wait no
</Storage>
""" % self.tmpfn
io = StringIO(sample)
rootconf = ZConfig.loadfile(io)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, ClientStorage)
self.assertEqual(args, {"addr": [self.tmpfn], "wait": 0})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, ClientStorage))
def testDemoStorage(self):
from ZODB.DemoStorage import DemoStorage
sample = """
<Storage>
type DemoStorage
</Storage>
"""
io = StringIO(sample)
rootconf = ZConfig.loadfile(io)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, DemoStorage)
self.assertEqual(args, {})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, DemoStorage))
def testModuleStorage(self):
# Test explicit module+class
from ZODB.DemoStorage import DemoStorage
sample = """
<Storage>
type ZODB.DemoStorage.DemoStorage
</Storage>
"""
io = StringIO(sample)
rootconf = ZConfig.loadfile(io)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, DemoStorage)
self.assertEqual(args, {})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, DemoStorage))
def testFullStorage(self):
try:
from bsddb3Storage.Full import Full
except ImportError:
return
sample = """
<Storage>
type Full
name %s
cachesize 1000
</Storage>
""" % self.tmpfn
os.mkdir(self.tmpfn)
io = StringIO(sample)
rootconf = ZConfig.loadfile(io)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, Full)
# It's too hard to test the config instance equality
args = args.copy()
del args['config']
self.assertEqual(args, {"name": self.tmpfn})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, Full))
# XXX _config isn't public
self.assert_(self.storage._config.cachesize, 1000)
def testMinimalStorage(self):
try:
from bsddb3Storage.Minimal import Minimal
except ImportError:
return
sample = """
<Storage>
type Minimal
name %s
cachesize 1000
</Storage>
""" % self.tmpfn
os.mkdir(self.tmpfn)
io = StringIO(sample)
rootconf = ZConfig.loadfile(io)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, Minimal)
# It's too hard to test the config instance equality
args = args.copy()
del args['config']
self.assertEqual(args, {"name": self.tmpfn})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, Minimal))
# XXX _config isn't public
self.assert_(self.storage._config.cachesize, 1000)
def test_suite():
return unittest.makeSuite(StorageTestCase)
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment