Commit 29fe44e5 authored by Vincent Pelletier's avatar Vincent Pelletier

Add support for a persistent, updated list of master nodes.

parent f3188b90
......@@ -58,7 +58,7 @@ class Application(object):
def __init__(self, config):
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
self.nm = NodeManager(config.getDynamicMasterList())
self.name = config.getCluster()
self.server = config.getBind()
......
......@@ -64,7 +64,8 @@ class Storage(BaseStorage.BaseStorage,
)))
def __init__(self, master_nodes, name, read_only=False,
compress=None, logfile=None, verbose=False, _app=None, **kw):
compress=None, logfile=None, verbose=False, _app=None,
dynamic_master_list=None, **kw):
"""
Do not pass those parameters (used internally):
_app
......@@ -76,7 +77,8 @@ class Storage(BaseStorage.BaseStorage,
# Warning: _is_read_only is used in BaseStorage, do not rename it.
self._is_read_only = read_only
if _app is None:
_app = Application(master_nodes, name, compress=compress)
_app = Application(master_nodes, name, compress=compress,
dynamic_master_list=dynamic_master_list)
self.app = _app
# Used to clone self (see new_instance & IMVCCStorage definition).
self._init_args = (master_nodes, name)
......@@ -85,6 +87,7 @@ class Storage(BaseStorage.BaseStorage,
'compress': compress,
'logfile': logfile,
'verbose': verbose,
'dynamic_master_list': dynamic_master_list,
'_app': _app,
}
......
......@@ -71,7 +71,8 @@ CHECKED_SERIAL = object()
class Application(object):
"""The client node application."""
def __init__(self, master_nodes, name, compress=True, **kw):
def __init__(self, master_nodes, name, compress=True,
dynamic_master_list=None, **kw):
# Start polling thread
self.em = EventManager()
self.poll_thread = ThreadedPoll(self.em, name=name)
......@@ -82,7 +83,7 @@ class Application(object):
master_addresses, connector_name = parseMasterList(master_nodes)
self.connector_handler = getConnectorHandler(connector_name)
self.dispatcher = Dispatcher(self.poll_thread)
self.nm = NodeManager()
self.nm = NodeManager(dynamic_master_list)
self.cp = ConnectionPool(self)
self.pt = None
self.master_conn = None
......
......@@ -32,5 +32,12 @@
Log debugging information
</description>
</key>
<key name="dynamic_master_list" datatype="path">
<description>
The file designated by this option contains an updated list of master
nodes which are known to be part of current cluster, so new nodes can
be added/removed without requiring a config change each time.
</description>
</key>
</sectiontype>
</component>
......@@ -59,6 +59,9 @@ class ConfigurationManager(object):
def getDatabase(self):
return self.__get('database')
def getDynamicMasterList(self):
return self.__get('dynamic_master_list', optional=True)
def getAdapter(self):
return self.__get('adapter')
......
......@@ -16,6 +16,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from time import time
from os.path import exists, getsize
import json
import neo.lib
from .util import dump
......@@ -251,20 +253,67 @@ NODE_CLASS_MAPPING = {
AdminNode: NodeTypes.ADMIN,
}
class MasterDB(object):
"""
Manages accesses to master's address database.
"""
def __init__(self, path):
self._path = path
try_load = exists(path) and getsize(path)
if try_load:
db = open(path, 'r')
init_set = map(tuple, json.load(db))
else:
db = open(path, 'w+')
init_set = []
self._set = set(init_set)
db.close()
def _save(self):
try:
db = open(self._path, 'w')
except IOError:
neo.lib.logging.warning('failed opening master database at %r '
'for writing, update skipped', self._path)
else:
json.dump(list(self._set), db)
db.close()
def add(self, addr):
self._set.add(addr)
self._save()
def discard(self, addr):
self._set.discard(addr)
self._save()
def __iter__(self):
return iter(self._set)
class NodeManager(object):
"""This class manages node status."""
_master_db = None
# TODO: rework getXXXList() methods, filter first by node type
# - getStorageList(identified=True, connected=True, )
# - getList(...)
def __init__(self):
def __init__(self, master_db=None):
"""
master_db (string)
Path to a file containing master nodes's addresses. Used to automate
master list updates. If not provided, no automation will happen.
"""
self._node_set = set()
self._address_dict = {}
self._uuid_dict = {}
self._type_dict = {}
self._state_dict = {}
self._identified_dict = {}
if master_db is not None:
self._master_db = db = MasterDB(master_db)
for addr in db:
self.createMaster(addr)
close = __init__
......@@ -279,6 +328,8 @@ class NodeManager(object):
self.__updateSet(self._type_dict, None, node.__class__, node)
self.__updateSet(self._state_dict, None, node.getState(), node)
self._updateIdentified(node)
if node.isMaster() and self._master_db is not None:
self._master_db.add(node.getAddress())
def remove(self, node):
if node not in self._node_set:
......@@ -292,6 +343,8 @@ class NodeManager(object):
uuid = node.getUUID()
if uuid in self._identified_dict:
del self._identified_dict[uuid]
if node.isMaster() and self._master_db is not None:
self._master_db.discard(node.getAddress())
def __drop(self, index_dict, key):
try:
......
......@@ -46,7 +46,7 @@ class Application(object):
def __init__(self, config):
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
self.nm = NodeManager(config.getDynamicMasterList())
self.tm = TransactionManager(self.onTransactionCommitted)
self.name = config.getCluster()
......
......@@ -34,6 +34,8 @@ parser.add_option('-c', '--cluster', help = 'the cluster name')
parser.add_option('-m', '--masters', help = 'master node list')
parser.add_option('-b', '--bind', help = 'the local address to bind to')
parser.add_option('-n', '--name', help = 'the node name (improve logging)')
parser.add_option('-D', '--dynamic-master-list', help='path of the file '
'containing dynamic master node list')
defaults = dict(
name = 'admin',
......
......@@ -35,6 +35,8 @@ parser.add_option('-m', '--masters', help = 'master node list')
parser.add_option('-r', '--replicas', help = 'replicas number')
parser.add_option('-p', '--partitions', help = 'partitions number')
parser.add_option('-l', '--logfile', help = 'specify a logging file')
parser.add_option('-D', '--dynamic-master-list', help='path of the file '
'containing dynamic master node list')
defaults = dict(
name = 'master',
......
......@@ -40,6 +40,8 @@ parser.add_option('-c', '--cluster', help = 'the cluster name')
parser.add_option('-m', '--masters', help = 'master node list')
parser.add_option('-a', '--adapter', help = 'database adapter to use')
parser.add_option('-d', '--database', help = 'database connections string')
parser.add_option('-D', '--dynamic-master-list', help='path of the file '
'containing dynamic master node list')
defaults = dict(
name = 'storage',
......
......@@ -46,7 +46,7 @@ class Application(object):
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
self.nm = NodeManager(config.getDynamicMasterList())
self.tm = TransactionManager(self)
self.dm = buildDatabaseManager(config.getAdapter(), config.getDatabase())
......
......@@ -20,9 +20,11 @@ from mock import Mock
from neo.lib import protocol
from neo.lib.protocol import NodeTypes, NodeStates
from neo.lib.node import Node, MasterNode, StorageNode, \
ClientNode, AdminNode, NodeManager
from . import NeoUnitTestBase
ClientNode, AdminNode, NodeManager, MasterDB
from . import NeoUnitTestBase, getTempDirectory
from time import time
from os import chmod, mkdir, rmdir, unlink
from os.path import join, exists
class NodesTests(NeoUnitTestBase):
......@@ -313,6 +315,85 @@ class NodeManagerTests(NeoUnitTestBase):
self.checkIdentified([self.master, self.storage], pool_set=[
self.master.getUUID(), self.storage.getUUID()])
class MasterDBTests(NeoUnitTestBase):
def _checkMasterDB(self, path, expected_master_list):
db = list(MasterDB(path))
db_set = set(db)
# Generic sanity check
self.assertEqual(len(db), len(db_set))
self.assertEqual(db_set, set(expected_master_list))
def testInitialAccessRights(self):
"""
Verify MasterDB raises immediately on instanciation if it cannot
create a non-existing database. This does not guarantee any later
open will succeed, but makes the simple error case obvious.
"""
temp_dir = getTempDirectory()
directory = join(temp_dir, 'read_only')
assert not exists(directory), db_file
db_file = join(directory, 'not_created')
mkdir(directory)
try:
chmod(directory, 0400)
self.assertRaises(IOError, MasterDB, db_file)
finally:
rmdir(directory)
def testLaterAccessRights(self):
"""
Verify MasterDB does not raise when modifying database.
"""
temp_dir = getTempDirectory()
directory = join(temp_dir, 'read_write')
assert not exists(directory), db_file
db_file = join(directory, 'db')
mkdir(directory)
try:
db = MasterDB(db_file)
self.assertTrue(exists(db_file), db_file)
chmod(db_file, 0400)
address = ('example.com', 1024)
# Must not raise
db.add(address)
# Value is stored
self.assertTrue(address in db, [x for x in db])
# But not visible to a new db instance (write access restored so
# it can be created)
chmod(db_file, 0600)
db2 = MasterDB(db_file)
self.assertFalse(address in db2, [x for x in db2])
finally:
if exists(db_file):
unlink(db_file)
rmdir(directory)
def testPersistence(self):
temp_dir = getTempDirectory()
directory = join(temp_dir, 'read_write')
assert not exists(directory), db_file
db_file = join(directory, 'db')
mkdir(directory)
try:
db = MasterDB(db_file)
self.assertTrue(exists(db_file), db_file)
address = ('example.com', 1024)
db.add(address)
address2 = ('example.org', 1024)
db.add(address2)
# Values are visible to a new db instance
db2 = MasterDB(db_file)
self.assertTrue(address in db2, [x for x in db2])
self.assertTrue(address2 in db2, [x for x in db2])
db.discard(address)
# Create yet another instance (file is not supposed to be shared)
db3 = MasterDB(db_file)
self.assertFalse(address in db3, [x for x in db3])
self.assertTrue(address2 in db3, [x for x in db3])
finally:
if exists(db_file):
unlink(db_file)
rmdir(directory)
if __name__ == '__main__':
unittest.main()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment